You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:49:00 UTC

[11/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 8daee7c..96b2f55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -22,13 +22,14 @@ import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -92,9 +93,8 @@ public class RuntimeEnvironment implements Environment, Runnable {
 
 	private final AtomicBoolean canceled = new AtomicBoolean();
 
-	private final IntermediateResultPartition[] producedPartitions;
-
-	private final BufferWriter[] writers;
+	private final ResultPartition[] producedPartitions;
+	private final ResultPartitionWriter[] writers;
 
 	private final SingleInputGate[] inputGates;
 
@@ -116,23 +116,27 @@ public class RuntimeEnvironment implements Environment, Runnable {
 
 		try {
 			// Produced intermediate result partitions
-			final List<PartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
+			final List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
 
-			this.producedPartitions = new IntermediateResultPartition[partitions.size()];
-			this.writers = new BufferWriter[partitions.size()];
+			this.producedPartitions = new ResultPartition[partitions.size()];
+			this.writers = new ResultPartitionWriter[partitions.size()];
 
 			for (int i = 0; i < this.producedPartitions.length; i++) {
-				this.producedPartitions[i] = IntermediateResultPartition.create(this, i, owner.getJobID(), owner.getExecutionId(), networkEnvironment, partitions.get(i));
-				writers[i] = new BufferWriter(this.producedPartitions[i]);
+				ResultPartitionDeploymentDescriptor desc = partitions.get(i);
+				ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), owner.getExecutionId());
+
+				this.producedPartitions[i] = new ResultPartition(owner.getJobID(), partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), networkEnvironment, ioManager);
+
+				writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
 			}
 
 			// Consumed intermediate result partitions
-			final List<PartitionConsumerDeploymentDescriptor> consumedPartitions = tdd.getConsumedPartitions();
+			final List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
 
 			this.inputGates = new SingleInputGate[consumedPartitions.size()];
 
 			for (int i = 0; i < inputGates.length; i++) {
-				inputGates[i] = SingleInputGate.create(networkEnvironment, consumedPartitions.get(i));
+				inputGates[i] = SingleInputGate.create(consumedPartitions.get(i), networkEnvironment);
 
 				// The input gates are organized by key for task updates/channel updates at runtime
 				inputGatesById.put(inputGates[i].getConsumedResultId(), inputGates[i]);
@@ -211,7 +215,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 
 			// Finish the produced partitions
 			if (producedPartitions != null) {
-				for (IntermediateResultPartition partition : producedPartitions) {
+				for (ResultPartition partition : producedPartitions) {
 					if (partition != null) {
 						partition.finish();
 					}
@@ -340,14 +344,14 @@ public class RuntimeEnvironment implements Environment, Runnable {
 	}
 
 	@Override
-	public BufferWriter getWriter(int index) {
+	public ResultPartitionWriter getWriter(int index) {
 		checkElementIndex(index, writers.length, "Illegal environment writer request.");
 
 		return writers[checkElementIndex(index, writers.length)];
 	}
 
 	@Override
-	public BufferWriter[] getAllWriters() {
+	public ResultPartitionWriter[] getAllWriters() {
 		return writers;
 	}
 
@@ -363,7 +367,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 		return inputGates;
 	}
 
-	public IntermediateResultPartition[] getProducedPartitions() {
+	public ResultPartition[] getProducedPartitions() {
 		return producedPartitions;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cf24b20..eda5bdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,23 +20,22 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
-import static akka.dispatch.Futures.future;
-
 import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.deployment.PartialPartitionInfo;
-import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.io.network.RemoteAddress;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -44,24 +43,23 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
-
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import static com.google.common.base.Preconditions.checkArgument;
+import static akka.dispatch.Futures.future;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
@@ -71,6 +69,12 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.FailIntermediateResultPartitions;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTask;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTaskSinglePartitionInfo;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.createUpdateTaskMultiplePartitionInfos;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -113,7 +117,7 @@ public class Execution implements Serializable {
 
 	private final FiniteDuration timeout;
 
-	private ConcurrentLinkedQueue<PartialPartitionInfo> partialPartitionInfos;
+	private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
 	private volatile ExecutionState state = CREATED;
 	
@@ -128,8 +132,6 @@ public class Execution implements Serializable {
 	// --------------------------------------------------------------------------------------------
 	
 	public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
-		checkArgument(attemptNumber >= 0);
-		
 		this.vertex = checkNotNull(vertex);
 		this.attemptId = new ExecutionAttemptID();
 		
@@ -140,17 +142,17 @@ public class Execution implements Serializable {
 
 		this.timeout = timeout;
 
-		this.partialPartitionInfos = new ConcurrentLinkedQueue<PartialPartitionInfo>();
+		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>();
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	//   Properties
 	// --------------------------------------------------------------------------------------------
-	
+
 	public ExecutionVertex getVertex() {
 		return vertex;
 	}
-	
+
 	public ExecutionAttemptID getAttemptId() {
 		return attemptId;
 	}
@@ -158,15 +160,15 @@ public class Execution implements Serializable {
 	public int getAttemptNumber() {
 		return attemptNumber;
 	}
-	
+
 	public ExecutionState getState() {
 		return state;
 	}
-	
+
 	public SimpleSlot getAssignedResource() {
 		return assignedResource;
 	}
-	
+
 	public InstanceConnectionInfo getAssignedResourceLocation() {
 		return assignedResourceLocation;
 	}
@@ -196,8 +198,8 @@ public class Execution implements Serializable {
 		}
 		assignedResource = null;
 
-		partialPartitionInfos.clear();
-		partialPartitionInfos = null;
+		partialInputChannelDeploymentDescriptors.clear();
+		partialInputChannelDeploymentDescriptors = null;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -226,7 +228,7 @@ public class Execution implements Serializable {
 
 		// sanity check
 		if (locationConstraint != null && sharingGroup == null) {
-			throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing not allowed.");
+			throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
 		}
 
 		if (transitionState(CREATED, SCHEDULED)) {
@@ -328,7 +330,7 @@ public class Execution implements Serializable {
 
 			Instance instance = slot.getInstance();
 			Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
-					new TaskManagerMessages.SubmitTask(deployment), new Timeout(timeout));
+					new SubmitTask(deployment), new Timeout(timeout));
 
 			deployAction.onComplete(new OnComplete<Object>(){
 
@@ -432,22 +434,38 @@ public class Execution implements Serializable {
 		}
 	}
 
-	// TODO This leads to many unnecessary RPC calls in most cases
-	void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> consumers) {
-		if (consumers.size() != 1) {
-			fail(new IllegalStateException("Only one consumer is supported currently."));
+	void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
+		if (allConsumers.size() != 1) {
+			fail(new IllegalStateException("Currently, only a single consumer group per partition is supported."));
 		}
 
-		final List<ExecutionEdge> consumer = consumers.get(0);
-
-		for (ExecutionEdge edge : consumer) {
+		for (ExecutionEdge edge : allConsumers.get(0)) {
 			final ExecutionVertex consumerVertex = edge.getTarget();
 
-			final ExecutionState consumerState = consumerVertex.getExecutionState();
+			final Execution consumer = consumerVertex.getCurrentExecutionAttempt();
+			final ExecutionState consumerState = consumer.getState();
 
-			if (consumerState == CREATED) {
-				consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge));
+			final IntermediateResultPartition partition = edge.getSource();
 
+			// ----------------------------------------------------------------
+			// Consumer is created => try to deploy and cache input channel
+			// descriptors if there is a deployment race
+			// ----------------------------------------------------------------
+			if (consumerState == CREATED) {
+				final Execution partitionExecution = partition.getProducer()
+						.getCurrentExecutionAttempt();
+
+				consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
+						partition, partitionExecution));
+
+				// When deploying a consuming task, its task deployment descriptor will contain all
+				// deployment information available at the respective time. It is possible that some
+				// of the partitions to be consumed have not been created yet. These are updated
+				// runtime via the update messages.
+				//
+				// TODO The current approach may send many update messages even though the consuming
+				// task has already been deployed with all necessary information. We have to check
+				// whether this is a problem and fix it, if it is.
 				future(new Callable<Boolean>(){
 					@Override
 					public Boolean call() throws Exception {
@@ -468,41 +486,64 @@ public class Execution implements Serializable {
 					consumerVertex.sendPartitionInfos();
 				}
 			}
-			else if (consumerState == RUNNING) {
-				SimpleSlot consumerSlot = consumerVertex.getCurrentAssignedResource();
-				ExecutionAttemptID consumerExecutionId = consumerVertex.
-						getCurrentExecutionAttempt().getAttemptId();
-
-				IntermediateResultPartitionID partitionID = edge.getSource().getPartitionId();
-				int connectionIndex = edge.getSource().getIntermediateResult().getConnectionIndex();
-
-				PartitionInfo.PartitionLocation producerLocation;
-				RemoteAddress producerAddress = null;
-
-				if(consumerSlot.getInstance().getInstanceConnectionInfo().equals(
-						getAssignedResourceLocation())) {
-					producerLocation = PartitionInfo.PartitionLocation.LOCAL;
-				} else {
-					producerLocation = PartitionInfo.PartitionLocation.REMOTE;
-					producerAddress = new RemoteAddress(getAssignedResourceLocation(),
-							connectionIndex);
-				}
+			// ----------------------------------------------------------------
+			// Consumer is running => send update message now
+			// ----------------------------------------------------------------
+			else {
+				if (consumerState == RUNNING) {
+					final SimpleSlot consumerSlot = consumer.getAssignedResource();
 
-				PartitionInfo partitionInfo = new PartitionInfo(partitionID, attemptId,
-						producerLocation, producerAddress);
+					if (consumerSlot == null) {
+						// The consumer has been reset concurrently
+						continue;
+					}
 
-				TaskManagerMessages.UpdateTask updateTaskMessage =
-						new TaskManagerMessages.UpdateTaskSinglePartitionInfo(consumerExecutionId,
-								edge.getSource().getIntermediateResult().getId(), partitionInfo);
+					final Instance consumerInstance = consumerSlot.getInstance();
 
-				sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
-			}
-			else if (consumerState == SCHEDULED || consumerState == DEPLOYING) {
-				consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge));
+					final ResultPartitionID partitionId = new ResultPartitionID(
+							partition.getPartitionId(), attemptId);
 
-				// double check to resolve race conditions
-				if(consumerVertex.getExecutionState() == RUNNING){
-					consumerVertex.sendPartitionInfos();
+					final Instance partitionInstance = partition.getProducer()
+							.getCurrentAssignedResource().getInstance();
+
+					final ResultPartitionLocation partitionLocation;
+
+					if (consumerInstance.equals(partitionInstance)) {
+						// Consuming task is deployed to the same instance as the partition => local
+						partitionLocation = ResultPartitionLocation.createLocal();
+					}
+					else {
+						// Different instances => remote
+						final ConnectionID connectionId = new ConnectionID(
+								partitionInstance.getInstanceConnectionInfo(),
+								partition.getIntermediateResult().getConnectionIndex());
+
+						partitionLocation = ResultPartitionLocation.createRemote(connectionId);
+					}
+
+					final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(
+							partitionId, partitionLocation);
+
+					final UpdateTask updateTaskMessage = new UpdateTaskSinglePartitionInfo(
+							consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
+
+					sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
+				}
+				// ----------------------------------------------------------------
+				// Consumer is scheduled or deploying => cache input channel
+				// deployment descriptors and send update message later
+				// ----------------------------------------------------------------
+				else if (consumerState == SCHEDULED || consumerState == DEPLOYING) {
+					final Execution partitionExecution = partition.getProducer()
+							.getCurrentExecutionAttempt();
+
+					consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor
+							.fromEdge(partition, partitionExecution));
+
+					// double check to resolve race conditions
+					if (consumerVertex.getExecutionState() == RUNNING) {
+						consumerVertex.sendPartitionInfos();
+					}
 				}
 			}
 		}
@@ -543,6 +584,23 @@ public class Execution implements Serializable {
 
 				if (transitionState(current, FINISHED)) {
 					try {
+						if (getVertex().finishAllBlockingPartitions()) {
+
+							IntermediateResult[] allResults = getVertex().getJobVertex()
+									.getProducedDataSets();
+
+							LOG.debug("Finished all produced partitions ({}). Scheduling all receivers " +
+									"of the following datasets: {}.", this, Arrays
+									.toString(allResults));
+
+							// Schedule next batch
+							for (IntermediateResult result : allResults) {
+								for (IntermediateResultPartition p : result.getPartitions()) {
+									scheduleOrUpdateConsumers(p.getConsumers());
+								}
+							}
+						}
+
 						assignedResource.releaseSlot();
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
@@ -612,28 +670,28 @@ public class Execution implements Serializable {
 		}
 	}
 
-	void cachePartitionInfo(PartialPartitionInfo partitionInfo) {
-		partialPartitionInfos.add(partitionInfo);
+	void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
+		partialInputChannelDeploymentDescriptors.add(partitionInfo);
 	}
 
 	void sendPartitionInfos() {
 		// check if the ExecutionVertex has already been archived and thus cleared the
 		// partial partition infos queue
-		if(partialPartitionInfos != null && !partialPartitionInfos.isEmpty()) {
+		if(partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
 
-			PartialPartitionInfo partialPartitionInfo;
+			PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
 
 			List<IntermediateDataSetID> resultIDs = new ArrayList<IntermediateDataSetID>();
-			List<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
+			List<InputChannelDeploymentDescriptor> inputChannelDeploymentDescriptors = new ArrayList<InputChannelDeploymentDescriptor>();
 
-			while ((partialPartitionInfo = partialPartitionInfos.poll()) != null) {
-				resultIDs.add(partialPartitionInfo.getIntermediateDataSetID());
-				partitionInfos.add(partialPartitionInfo.createPartitionInfo(this));
+			while ((partialInputChannelDeploymentDescriptor = partialInputChannelDeploymentDescriptors.poll()) != null) {
+				resultIDs.add(partialInputChannelDeploymentDescriptor.getResultId());
+				inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
 			}
 
-			TaskManagerMessages.UpdateTask updateTaskMessage =
-					TaskManagerMessages.createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
-							partitionInfos);
+			UpdateTask updateTaskMessage =
+					createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
+							inputChannelDeploymentDescriptors);
 
 			sendUpdateTaskRpcCall(assignedResource, updateTaskMessage);
 		}
@@ -752,7 +810,7 @@ public class Execution implements Serializable {
 		if (slot != null) {
 
 			Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
-							TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
+							CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
 					AkkaUtils.globalExecutionContext(), timeout);
 
 			cancelResult.onComplete(new OnComplete<Object>() {
@@ -782,7 +840,7 @@ public class Execution implements Serializable {
 			if (instance.isAlive()) {
 				try {
 					// TODO For some tests this could be a problem when querying too early if all resources were released
-					instance.getTaskManager().tell(new TaskManagerMessages.FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
+					instance.getTaskManager().tell(new FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
 				} catch (Throwable t) {
 					fail(new Exception("Intermediate result partition could not be failed.", t));
 				}
@@ -791,7 +849,8 @@ public class Execution implements Serializable {
 	}
 
 	private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
-									final TaskManagerMessages.UpdateTask updateTaskMsg) {
+									final UpdateTask updateTaskMsg) {
+
 		if (consumerSlot != null) {
 			final Instance instance = consumerSlot.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
index 0d81fde..3fb3825 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -43,4 +43,9 @@ public class ExecutionEdge {
 	public int getInputNum() {
 		return inputNum;
 	}
+
+	@Override
+	public String toString() {
+		return "ExecutionEdge [" + source + " <=> " + target + "]";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 81f83e6..01495f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -175,17 +176,14 @@ public class ExecutionGraph implements Serializable {
 	 * Once this value has reached the number of vertices, the job is done. */
 	private int nextVertexToFinish;
 
-
 	private ActorContext parentContext;
 
 	private  ActorRef stateMonitorActor;
-	
+
 	private boolean monitoringEnabled;
-	
-	private long monitoringInterval = 10000;
 
+	private long monitoringInterval = 10000;
 
-	
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
 		this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
 	}
@@ -446,6 +444,7 @@ public class ExecutionGraph implements Serializable {
 					for (ExecutionJobVertex ejv : getVerticesTopologically()) {
 						ejv.scheduleAll(scheduler, allowQueuedScheduling);
 					}
+
 					break;
 
 				case BACKTRACKING:
@@ -623,24 +622,24 @@ public class ExecutionGraph implements Serializable {
 	
 	public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states) {
 		synchronized (this.progressLock) {
-			for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, StateHandle> state : states.entrySet()) {
+			for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, StateHandle> state : states.entrySet())
 				tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
-			}
 		}
 	}
 
-	public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) {
-		Execution execution = currentExecutions.get(executionId);
+	public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
+
+		final Execution execution = currentExecutions.get(partitionId.getProducerId());
 
 		if (execution == null) {
 			fail(new IllegalStateException("Cannot find execution for execution ID " +
-					executionId));
+					partitionId.getPartitionId()));
 		}
-		else if(execution.getVertex() == null){
-			fail(new IllegalStateException("Execution with execution ID " + executionId +
-				" has no vertex assigned."));
+		else if (execution.getVertex() == null){
+			fail(new IllegalStateException("Execution with execution ID " +
+					partitionId.getPartitionId() + " has no vertex assigned."));
 		} else {
-			execution.getVertex().scheduleOrUpdateConsumers(partitionIndex);
+			execution.getVertex().scheduleOrUpdateConsumers(partitionId);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 0444e5d..6fdc628 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -39,13 +32,19 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.slf4j.Logger;
-
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 
 public class ExecutionJobVertex implements Serializable {
 	
@@ -82,7 +81,6 @@ public class ExecutionJobVertex implements Serializable {
 	
 	private InputSplitAssigner splitAssigner;
 	
-	
 	public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
 							int defaultParallelism, FiniteDuration timeout) throws JobException {
 		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
@@ -118,11 +116,14 @@ public class ExecutionJobVertex implements Serializable {
 		
 		// create the intermediate results
 		this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
+
 		for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
-			IntermediateDataSet set = jobVertex.getProducedDataSets().get(i);
-			this.producedDataSets[i] = new IntermediateResult(set.getId(), this, numTaskVertices);
+			final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
+
+			this.producedDataSets[i] = new IntermediateResult(
+					result.getId(), this, numTaskVertices, result.getResultType());
 		}
-		
+
 		// create all task vertices
 		for (int i = 0; i < numTaskVertices; i++) {
 			ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp);
@@ -374,6 +375,11 @@ public class ExecutionJobVertex implements Serializable {
 			catch (Throwable t) {
 				throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
 			}
+
+			// Reset intermediate results
+			for (IntermediateResult result : producedDataSets) {
+				result.resetForNewExecution();
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 794ca21..e5d9db8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,17 +20,19 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.PartialPartitionInfo;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -46,11 +48,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import static com.google.common.base.Preconditions.checkElementIndex;
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
@@ -63,35 +66,35 @@ public class ExecutionVertex implements Serializable {
 
 	private static final long serialVersionUID = 42L;
 
+	@SuppressWarnings("unused")
 	private static final Logger LOG = ExecutionGraph.LOG;
-	
+
 	private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private final ExecutionJobVertex jobVertex;
-	
-	private IntermediateResultPartition[] resultPartitions;
-	
+
+	private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
+
 	private ExecutionEdge[][] inputEdges;
 
 	private final int subTaskIndex;
-	
+
 	private final List<Execution> priorExecutions;
 
 	private final FiniteDuration timeout;
-	
+
 	private volatile CoLocationConstraint locationConstraint;
-	
+
 	private volatile Execution currentExecution;	// this field must never be null
-	
-	
+
 	private volatile List<Instance> locationConstraintInstances;
-	
+
 	private volatile boolean scheduleLocalOnly;
-	
+
 	private StateHandle operatorState;
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -105,11 +108,13 @@ public class ExecutionVertex implements Serializable {
 		this.jobVertex = jobVertex;
 		this.subTaskIndex = subTaskIndex;
 
-		this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
-		for (int i = 0; i < producedDataSets.length; i++) {
-			IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
-			this.resultPartitions[i] = irp;
-			producedDataSets[i].setPartition(subTaskIndex, irp);
+		this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1);
+
+		for (IntermediateResult result : producedDataSets) {
+			IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex);
+			result.setPartition(subTaskIndex, irp);
+
+			resultPartitions.put(irp.getPartitionId(), irp);
 		}
 
 		this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
@@ -129,71 +134,71 @@ public class ExecutionVertex implements Serializable {
 
 		this.timeout = timeout;
 	}
-	
-	
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Properties
 	// --------------------------------------------------------------------------------------------
-	
+
 	public JobID getJobId() {
 		return this.jobVertex.getJobId();
 	}
-	
+
 	public ExecutionJobVertex getJobVertex() {
 		return jobVertex;
 	}
-	
+
 	public JobVertexID getJobvertexId() {
 		return this.jobVertex.getJobVertexId();
 	}
-	
+
 	public String getTaskName() {
 		return this.jobVertex.getJobVertex().getName();
 	}
-	
+
 	public int getTotalNumberOfParallelSubtasks() {
 		return this.jobVertex.getParallelism();
 	}
-	
+
 	public int getParallelSubtaskIndex() {
 		return this.subTaskIndex;
 	}
-	
+
 	public int getNumberOfInputs() {
 		return this.inputEdges.length;
 	}
-	
+
 	public ExecutionEdge[] getInputEdges(int input) {
 		if (input < 0 || input >= this.inputEdges.length) {
 			throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
 		}
 		return inputEdges[input];
 	}
-	
+
 	public CoLocationConstraint getLocationConstraint() {
 		return locationConstraint;
 	}
-	
+
 	public Execution getCurrentExecutionAttempt() {
 		return currentExecution;
 	}
-	
+
 	public ExecutionState getExecutionState() {
 		return currentExecution.getState();
 	}
-	
+
 	public long getStateTimestamp(ExecutionState state) {
 		return currentExecution.getStateTimestamp(state);
 	}
-	
+
 	public Throwable getFailureCause() {
 		return currentExecution.getFailureCause();
 	}
-	
+
 	public SimpleSlot getCurrentAssignedResource() {
 		return currentExecution.getAssignedResource();
 	}
-	
+
 	public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
 		return currentExecution.getAssignedResourceLocation();
 	}
@@ -213,14 +218,14 @@ public class ExecutionVertex implements Serializable {
 	// --------------------------------------------------------------------------------------------
 	//  Graph building
 	// --------------------------------------------------------------------------------------------
-	
+
 	public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
-		
+
 		final DistributionPattern pattern = edge.getDistributionPattern();
 		final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
-		
+
 		ExecutionEdge[] edges;
-		
+
 		switch (pattern) {
 			case POINTWISE:
 				edges = connectPointwise(sourcePartitions, inputNumber);
@@ -229,14 +234,14 @@ public class ExecutionVertex implements Serializable {
 			case ALL_TO_ALL:
 				edges = connectAllToAll(sourcePartitions, inputNumber);
 				break;
-				
+
 			default:
 				throw new RuntimeException("Unrecognized distribution pattern.");
-		
+
 		}
-		
+
 		this.inputEdges[inputNumber] = edges;
-		
+
 		// add the consumers to the source
 		// for now (until the receiver initiated handshake is in place), we need to register the 
 		// edges as the execution graph
@@ -244,22 +249,22 @@ public class ExecutionVertex implements Serializable {
 			ee.getSource().addConsumer(ee, consumerNumber);
 		}
 	}
-	
+
 	private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
 		ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
-		
+
 		for (int i = 0; i < sourcePartitions.length; i++) {
 			IntermediateResultPartition irp = sourcePartitions[i];
 			edges[i] = new ExecutionEdge(irp, this, inputNumber);
 		}
-		
+
 		return edges;
 	}
-	
+
 	private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
 		final int numSources = sourcePartitions.length;
 		final int parallelism = getTotalNumberOfParallelSubtasks();
-	
+
 		// simple case same number of sources as targets
 		if (numSources == parallelism) {
 			return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
@@ -300,7 +305,7 @@ public class ExecutionVertex implements Serializable {
 
 				int start = (int) (subTaskIndex * factor);
 				int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
-						sourcePartitions.length : 
+						sourcePartitions.length :
 						(int) ((subTaskIndex + 1) * factor);
 
 				ExecutionEdge[] edges = new ExecutionEdge[end - start];
@@ -312,29 +317,29 @@ public class ExecutionVertex implements Serializable {
 			}
 		}
 	}
-	
+
 	public void setLocationConstraintHosts(List<Instance> instances) {
 		this.locationConstraintInstances = instances;
 	}
-	
+
 	public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
 		if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
 			throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
 		}
-		
+
 		this.scheduleLocalOnly = scheduleLocalOnly;
 	}
 
 	public boolean isScheduleLocalOnly() {
 		return scheduleLocalOnly;
 	}
-	
+
 	/**
 	 * Gets the location preferences of this task, determined by the locations of the predecessors from which
 	 * it receives input data.
 	 * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
 	 * method returns {@code null} to indicate no location preference.
-	 * 
+	 *
 	 * @return The preferred locations for this vertex execution, or null, if there is no preference.
 	 */
 	public Iterable<Instance> getPreferredLocations() {
@@ -343,13 +348,12 @@ public class ExecutionVertex implements Serializable {
 		if (constraintInstances != null && !constraintInstances.isEmpty()) {
 			return constraintInstances;
 		}
-		
+
 		// otherwise, base the preferred locations on the input connections
 		if (inputEdges == null) {
 			return Collections.emptySet();
 		}
 		else {
-
 			Set<Instance> locations = new HashSet<Instance>();
 			Set<Instance> inputLocations = new HashSet<Instance>();
 
@@ -398,11 +402,11 @@ public class ExecutionVertex implements Serializable {
 			Execution execution = currentExecution;
 			ExecutionState state = execution.getState();
 
-			if (state == FINISHED || state == CANCELED || state ==FAILED) {
+			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(this, execution.getAttemptNumber()+1,
 						System.currentTimeMillis(), timeout);
-				
+
 				CoLocationGroup grp = jobVertex.getCoLocationGroup();
 				if (grp != null) {
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
@@ -436,17 +440,33 @@ public class ExecutionVertex implements Serializable {
 	}
 
 	/**
-	 * Schedules or updates the {@link IntermediateResultPartition} consumer
-	 * tasks of the intermediate result partition with the given index.
+	 * Schedules or updates the consumer tasks of the result partition with the given ID.
 	 */
-	void scheduleOrUpdateConsumers(int partitionIndex) {
-		checkElementIndex(partitionIndex, resultPartitions.length);
+	void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
+
+		final Execution execution = currentExecution;
 
-		IntermediateResultPartition partition = resultPartitions[partitionIndex];
+		// Abort this request if there was a concurrent reset
+		if (!partitionId.getProducerId().equals(execution.getAttemptId())) {
+			return;
+		}
+
+		final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());
+
+		if (partition == null) {
+			throw new IllegalStateException("Unknown partition " + partitionId + ".");
+		}
 
-		currentExecution.scheduleOrUpdateConsumers(partition.getConsumers());
+		if (partition.getIntermediateResult().getResultType().isPipelined()) {
+			// Schedule or update receivers of this partition
+			execution.scheduleOrUpdateConsumers(partition.getConsumers());
+		}
+		else {
+			throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
+					"pipelined partitions.");
+		}
 	}
-	
+
 	/**
 	 * This method cleans fields that are irrelevant for the archived execution attempt.
 	 */
@@ -458,22 +478,22 @@ public class ExecutionVertex implements Serializable {
 		if (!(state == FINISHED || state == CANCELED || state == FAILED)) {
 			throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
 		}
-		
+
 		// prepare the current execution for archiving
 		execution.prepareForArchiving();
-		
+
 		// prepare previous executions for archiving
 		for (Execution exec : priorExecutions) {
 			exec.prepareForArchiving();
 		}
-		
+
 		// clear the unnecessary fields in this class
 		this.resultPartitions = null;
 		this.inputEdges = null;
 		this.locationConstraintInstances = null;
 	}
 
-	public void cachePartitionInfo(PartialPartitionInfo partitionInfo){
+	public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo){
 		getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
 	}
 
@@ -481,6 +501,25 @@ public class ExecutionVertex implements Serializable {
 		currentExecution.sendPartitionInfos();
 	}
 
+	/**
+	 * Returns whether to schedule the next batch of receiving tasks.
+	 */
+	boolean finishAllBlockingPartitions() {
+		for (IntermediateResultPartition partition : resultPartitions.values()) {
+			// Nothing to do for pipelined results
+			if (partition.getResultType().isPipelined()) {
+				return false;
+			}
+			// It's a blocking partition, mark it as finished and return whether all blocking
+			// partitions have been produced.
+			else if (partition.markFinished()) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Notifications from the Execution Attempt
 	// --------------------------------------------------------------------------------------------
@@ -507,20 +546,27 @@ public class ExecutionVertex implements Serializable {
 	void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
 		getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
 	}
-	
-	TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, SimpleSlot slot) {
+
+	/**
+	 * Creates a task deployment descriptor to deploy a subtask to the given target slot.
+	 */
+	TaskDeploymentDescriptor createDeploymentDescriptor(
+			ExecutionAttemptID executionId,
+			SimpleSlot targetSlot) {
+
 		// Produced intermediate results
-		List<PartitionDeploymentDescriptor> producedPartitions = new ArrayList<PartitionDeploymentDescriptor>(resultPartitions.length);
+		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
 
-		for (IntermediateResultPartition partition : resultPartitions) {
-			producedPartitions.add(PartitionDeploymentDescriptor.fromIntermediateResultPartition(partition));
+		for (IntermediateResultPartition partition : resultPartitions.values()) {
+			producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition));
 		}
 
 		// Consumed intermediate results
-		List<PartitionConsumerDeploymentDescriptor> consumedPartitions = new ArrayList<PartitionConsumerDeploymentDescriptor>();
+		List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<InputGateDeploymentDescriptor>();
 
 		for (ExecutionEdge[] edges : inputEdges) {
-			PartitionInfo[] partitions = PartitionInfo.fromEdges(edges, slot);
+			InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor
+					.fromEdges(edges, targetSlot);
 
 			// If the produced partition has multiple consumers registered, we
 			// need to request the one matching our sub task index.
@@ -531,7 +577,7 @@ public class ExecutionVertex implements Serializable {
 
 			IntermediateDataSetID resultId = edges[0].getSource().getIntermediateResult().getId();
 
-			consumedPartitions.add(new PartitionConsumerDeploymentDescriptor(resultId, partitions, queueToRequest));
+			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
 		}
 
 		List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
@@ -539,7 +585,7 @@ public class ExecutionVertex implements Serializable {
 		return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
 				subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
 				jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
-				producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber(), operatorState);
+				producedPartitions, consumedPartitions, jarFiles, targetSlot.getSlotNumber(), operatorState);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 64ad2d2..658a06b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -18,8 +18,13 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 public class IntermediateResult {
 
@@ -31,20 +36,30 @@ public class IntermediateResult {
 
 	private final int numParallelProducers;
 
+	private final AtomicInteger numberOfRunningProducers;
+
 	private int partitionsAssigned;
 
 	private int numConsumers;
 
 	private final int connectionIndex;
 
-	private final IntermediateResultPartitionType resultType;
+	private final ResultPartitionType resultType;
+
+	public IntermediateResult(
+			IntermediateDataSetID id,
+			ExecutionJobVertex producer,
+			int numParallelProducers,
+			ResultPartitionType resultType) {
 
-	public IntermediateResult(IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers) {
-		this.id = id;
-		this.producer = producer;
+		this.id = checkNotNull(id);
+		this.producer = checkNotNull(producer);
 		this.partitions = new IntermediateResultPartition[numParallelProducers];
+		checkArgument(numParallelProducers >= 1);
 		this.numParallelProducers = numParallelProducers;
 
+		this.numberOfRunningProducers = new AtomicInteger(numParallelProducers);
+
 		// we do not set the intermediate result partitions here, because we let them be initialized by
 		// the execution vertex that produces them
 
@@ -52,8 +67,7 @@ public class IntermediateResult {
 		this.connectionIndex = (int) (Math.random() * Integer.MAX_VALUE);
 
 		// The runtime type for this produced result
-		// TODO The JobGraph generator has to decide which type of result this is
-		this.resultType = IntermediateResultPartitionType.PIPELINED;
+		this.resultType = checkNotNull(resultType);
 	}
 
 	public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
@@ -85,7 +99,7 @@ public class IntermediateResult {
 		return partitionsAssigned;
 	}
 
-	public IntermediateResultPartitionType getResultType() {
+	public ResultPartitionType getResultType() {
 		return resultType;
 	}
 
@@ -104,4 +118,26 @@ public class IntermediateResult {
 	public int getConnectionIndex() {
 		return connectionIndex;
 	}
+
+	void resetForNewExecution() {
+		this.numberOfRunningProducers.set(numParallelProducers);
+	}
+
+	int decrementNumberOfRunningProducersAndGetRemaining() {
+		return numberOfRunningProducers.decrementAndGet();
+	}
+
+	boolean isConsumable() {
+		if (resultType.isPipelined()) {
+			return true;
+		}
+		else {
+			return numberOfRunningProducers.get() == 0;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "IntermediateResult " + id.toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 7d06dca..124ceb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import java.util.ArrayList;
@@ -59,10 +60,18 @@ public class IntermediateResultPartition {
 		return partitionId;
 	}
 
+	ResultPartitionType getResultType() {
+		return totalResult.getResultType();
+	}
+
 	public List<List<ExecutionEdge>> getConsumers() {
 		return consumers;
 	}
 
+	public boolean isConsumable() {
+		return totalResult.isConsumable();
+	}
+
 	int addConsumerGroup() {
 		int pos = consumers.size();
 
@@ -78,4 +87,24 @@ public class IntermediateResultPartition {
 	void addConsumer(ExecutionEdge edge, int consumerNumber) {
 		consumers.get(consumerNumber).add(edge);
 	}
+
+	boolean markFinished() {
+		// Sanity check that this is only called on blocking partitions.
+		if (!getResultType().isBlocking()) {
+			throw new IllegalStateException("Tried to mark a non-blocking result partition as finished");
+		}
+
+		final int refCnt = totalResult.decrementNumberOfRunningProducersAndGetRemaining();
+
+		if (refCnt == 0) {
+			return true;
+		}
+		else if (refCnt  < 0) {
+			throw new IllegalStateException("Decremented number of unfinished producers below 0. "
+					+ "This is most likely a bug in the execution state/intermediate result "
+					+ "partition management.");
+		}
+
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
index 6007db9..87a3baa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
@@ -61,7 +61,7 @@ public class ChannelReaderInputViewIterator<E> implements MutableObjectIterator<
 			segments, freeMemTarget, accessors, numBlocks);
 	}
 		
-	public ChannelReaderInputViewIterator(BlockChannelReader reader, LinkedBlockingQueue<MemorySegment> returnQueue,
+	public ChannelReaderInputViewIterator(BlockChannelReader<MemorySegment> reader, LinkedBlockingQueue<MemorySegment> returnQueue,
 			List<MemorySegment> segments, List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
 	throws IOException
 	{

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
index 9fb8072..736c245 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.util.MathUtils;
  */
 public class FileChannelInputView extends AbstractPagedInputView {
 	
-	private final BlockChannelReader reader;
+	private final BlockChannelReader<MemorySegment> reader;
 	
 	private final MemoryManager memManager;
 	
@@ -53,7 +53,7 @@ public class FileChannelInputView extends AbstractPagedInputView {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public FileChannelInputView(BlockChannelReader reader, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException {
+	public FileChannelInputView(BlockChannelReader<MemorySegment> reader, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException {
 		super(0);
 		
 		checkNotNull(reader);
@@ -129,7 +129,7 @@ public class FileChannelInputView extends AbstractPagedInputView {
 		
 		// get the next segment
 		numBlocksRemaining--;
-		return reader.getNextReturnedSegment();
+		return reader.getNextReturnedBlock();
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
index e04759c..b6c500f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.memorymanager.MemoryManager;
  */
 public class FileChannelOutputView extends AbstractPagedOutputView {
 	
-	private final BlockChannelWriter writer;		// the writer to the channel
+	private final BlockChannelWriter<MemorySegment> writer;		// the writer to the channel
 	
 	private final MemoryManager memManager;
 	
@@ -47,7 +47,7 @@ public class FileChannelOutputView extends AbstractPagedOutputView {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public FileChannelOutputView(BlockChannelWriter writer, MemoryManager memManager, List<MemorySegment> memory, int segmentSize) throws IOException {
+	public FileChannelOutputView(BlockChannelWriter<MemorySegment> writer, MemoryManager memManager, List<MemorySegment> memory, int segmentSize) throws IOException {
 		super(segmentSize, 0);
 		
 		checkNotNull(writer);
@@ -137,7 +137,7 @@ public class FileChannelOutputView extends AbstractPagedOutputView {
 		if (current != null) {
 			writeSegment(current, posInSegment);
 		}
-		return writer.getNextReturnedSegment();
+		return writer.getNextReturnedBlock();
 	}
 	
 	private void writeSegment(MemorySegment segment, int writePosition) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
index 6098fdb..7d8d485 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.util.MathUtils;
  */
 public class SeekableFileChannelInputView extends AbstractPagedInputView {
 	
-	private BlockChannelReader reader;
+	private BlockChannelReader<MemorySegment> reader;
 	
 	private final IOManager ioManager;
 	
@@ -127,7 +127,7 @@ public class SeekableFileChannelInputView extends AbstractPagedInputView {
 		}
 		
 		numBlocksRemaining--;
-		seekInput(reader.getNextReturnedSegment(), positionInBlock, numBlocksRemaining == 0 ? sizeOfLastBlock : segmentSize);
+		seekInput(reader.getNextReturnedBlock(), positionInBlock, numBlocksRemaining == 0 ? sizeOfLastBlock : segmentSize);
 	}
 	
 	public void close() throws IOException {
@@ -169,7 +169,7 @@ public class SeekableFileChannelInputView extends AbstractPagedInputView {
 		
 		// get the next segment
 		numBlocksRemaining--;
-		return reader.getNextReturnedSegment();
+		return reader.getNextReturnedBlock();
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
index 655a574..5f9c2cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
@@ -42,7 +42,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
 	
 	private final MemorySegmentSource memorySource;
 	
-	private BlockChannelWriter writer;
+	private BlockChannelWriter<MemorySegment> writer;
 	
 	private RandomAccessInputView inMemInView;
 	
@@ -86,7 +86,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
 					this.writer.writeBlock(this.fullSegments.get(i));
 				}
 				this.fullSegments.clear();
-				final MemorySegment seg = this.writer.getNextReturnedSegment();
+				final MemorySegment seg = this.writer.getNextReturnedBlock();
 				this.numMemorySegmentsInWriter--;
 				return seg;
 			}
@@ -94,7 +94,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
 			// spilling
 			this.writer.writeBlock(current);
 			this.blockCount++;
-			return this.writer.getNextReturnedSegment();
+			return this.writer.getNextReturnedBlock();
 		}
 	}
 	
@@ -116,7 +116,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
 				this.blockCount++;
 				this.writer.close();
 				for (int i = this.numMemorySegmentsInWriter; i > 0; i--) {
-					this.fullSegments.add(this.writer.getNextReturnedSegment());
+					this.fullSegments.add(this.writer.getNextReturnedBlock());
 				}
 				this.numMemorySegmentsInWriter = 0;
 			}
@@ -135,7 +135,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
 				this.externalInView.close();
 			}
 			
-			final BlockChannelReader reader = this.ioManager.createBlockChannelReader(this.writer.getChannelID());
+			final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(this.writer.getChannelID());
 			this.externalInView = new HeaderlessChannelReaderInputView(reader, this.fullSegments, this.blockCount, this.numBytesInLastSegment, false);
 			return this.externalInView;
 		}
@@ -161,7 +161,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
 			// closing before the first flip, collect the memory in the writer
 			this.writer.close();
 			for (int i = this.numMemorySegmentsInWriter; i > 0; i--) {
-				segments.add(this.writer.getNextReturnedSegment());
+				segments.add(this.writer.getNextReturnedBlock());
 			}
 			this.writer.closeAndDelete();
 			this.writer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
index 3991167..e79439f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -80,7 +80,7 @@ public abstract class AbstractFileIOChannel implements FileIOChannel {
 	
 	@Override
 	public abstract void close() throws IOException;
-	
+
 	@Override
 	public void deleteChannel() {
 		if (!isClosed() || this.fileChannel.isOpen()) {
@@ -104,4 +104,9 @@ public abstract class AbstractFileIOChannel implements FileIOChannel {
 			deleteChannel();
 		}
 	}
+
+	@Override
+	public FileChannel getNioFileChannel() {
+		return fileChannel;
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
index acfa71f..7a80b7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegment;
+
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * A reader that reads data in blocks from a file channel. The reader reads the blocks into a 
  * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
@@ -40,7 +40,7 @@ import org.apache.flink.core.memory.MemorySegment;
  * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
  * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.  
  */
-public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySegment, ReadRequest> implements BlockChannelReader {
+public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySegment, ReadRequest> implements BlockChannelReader<MemorySegment> {
 	
 	private final LinkedBlockingQueue<MemorySegment> returnSegments;
 	
@@ -57,7 +57,7 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
 			LinkedBlockingQueue<MemorySegment> returnSegments)
 	throws IOException
 	{
-		super(channelID, requestQueue, new QueuingCallback(returnSegments), false);
+		super(channelID, requestQueue, new QueuingCallback<MemorySegment>(returnSegments), false);
 		this.returnSegments = returnSegments;
 	}	
 
@@ -74,7 +74,12 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
 	public void readBlock(MemorySegment segment) throws IOException {
 		addRequest(new SegmentReadRequest(this, segment));
 	}
-	
+
+	@Override
+	public void seekToPosition(long position) throws IOException {
+		requestQueue.add(new SeekRequest(this, position));
+	}
+
 	/**
 	 * Gets the next memory segment that has been filled with data by the reader. This method blocks until
 	 * such a segment is available, or until an error occurs in the reader, or the reader is closed.
@@ -87,7 +92,7 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
 	 * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
 	 */
 	@Override
-	public MemorySegment getNextReturnedSegment() throws IOException {
+	public MemorySegment getNextReturnedBlock() throws IOException {
 		try {
 			while (true) {
 				final MemorySegment next = this.returnSegments.poll(1000, TimeUnit.MILLISECONDS);
@@ -115,9 +120,4 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
 	public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
 		return this.returnSegments;
 	}
-	
-	@Override
-	public void seekToPosition(long position) throws IOException {
-		this.requestQueue.add(new SeekRequest(this, position));
-	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
index 7e1681f..18d16a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter {
+public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter<MemorySegment> {
 	
 	private final LinkedBlockingQueue<MemorySegment> returnSegments;
 	
@@ -41,7 +41,7 @@ public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback
 			LinkedBlockingQueue<MemorySegment> returnSegments)
 	throws IOException
 	{
-		super(channelID, requestQueue, new QueuingCallback(returnSegments));
+		super(channelID, requestQueue, new QueuingCallback<MemorySegment>(returnSegments));
 		this.returnSegments = returnSegments;
 	}
 	
@@ -58,7 +58,7 @@ public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback
 	 * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
 	 */
 	@Override
-	public MemorySegment getNextReturnedSegment() throws IOException {
+	public MemorySegment getNextReturnedBlock() throws IOException {
 		try {
 			while (true) {
 				final MemorySegment next = returnSegments.poll(1000, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
index c9fbdd2..e5dab47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
  * An asynchronous implementation of the {@link BlockChannelWriterWithCallback} that queues I/O requests
  * and calls a callback once they have been handled.
  */
-public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<MemorySegment, WriteRequest> implements BlockChannelWriterWithCallback {
+public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<MemorySegment, WriteRequest> implements BlockChannelWriterWithCallback<MemorySegment> {
 	
 	/**
 	 * Creates a new asynchronous block writer for the given channel.

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java
new file mode 100644
index 0000000..ba5f0ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsynchronousBufferFileReader extends AsynchronousFileIOChannel<Buffer, ReadRequest> implements BufferFileReader {
+
+	private final AtomicBoolean hasReachedEndOfFile = new AtomicBoolean();
+
+	protected AsynchronousBufferFileReader(ID channelID, RequestQueue<ReadRequest> requestQueue, RequestDoneCallback<Buffer> callback) throws IOException {
+		super(channelID, requestQueue, callback, false);
+	}
+
+	@Override
+	public void readInto(Buffer buffer) throws IOException {
+		addRequest(new BufferReadRequest(this, buffer, hasReachedEndOfFile));
+	}
+
+	@Override
+	public void seekToPosition(long position) throws IOException {
+		requestQueue.add(new SeekRequest(this, position));
+	}
+
+	@Override
+	public boolean hasReachedEndOfFile() {
+		return hasReachedEndOfFile.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java
new file mode 100644
index 0000000..c2a277a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsynchronousBufferFileSegmentReader extends AsynchronousFileIOChannel<FileSegment, ReadRequest> implements BufferFileSegmentReader {
+
+	private final AtomicBoolean hasReachedEndOfFile = new AtomicBoolean();
+
+	protected AsynchronousBufferFileSegmentReader(ID channelID, RequestQueue<ReadRequest> requestQueue, RequestDoneCallback<FileSegment> callback) throws IOException {
+		super(channelID, requestQueue, callback, false);
+	}
+
+	@Override
+	public void read() throws IOException {
+		addRequest(new FileSegmentReadRequest(this, hasReachedEndOfFile));
+	}
+
+	@Override
+	public void seekTo(long position) throws IOException {
+		requestQueue.add(new SeekRequest(this, position));
+	}
+
+	@Override
+	public boolean hasReachedEndOfFile() {
+		return hasReachedEndOfFile.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
new file mode 100644
index 0000000..14bb8f7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+
+public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buffer, WriteRequest> implements BufferFileWriter {
+
+	private static final RecyclingCallback CALLBACK = new RecyclingCallback();
+
+	protected AsynchronousBufferFileWriter(ID channelID, RequestQueue<WriteRequest> requestQueue) throws IOException {
+		super(channelID, requestQueue, CALLBACK, true);
+	}
+
+	@Override
+	public void writeBlock(Buffer buffer) throws IOException {
+		addRequest(new BufferWriteRequest(this, buffer));
+	}
+
+	@Override
+	public int getNumberOfOutstandingRequests() {
+		return requestsNotReturned.get();
+	}
+
+	@Override
+	public boolean registerAllRequestsProcessedListener(NotificationListener listener) throws IOException {
+		return super.registerAllRequestsProcessedListener(listener);
+	}
+
+	/**
+	 * Recycles the buffer after the I/O request.
+	 */
+	private static class RecyclingCallback implements RequestDoneCallback<Buffer> {
+
+		@Override
+		public void requestSuccessful(Buffer buffer) {
+			buffer.recycle();
+		}
+
+		@Override
+		public void requestFailed(Buffer buffer, IOException e) {
+			buffer.recycle();
+		}
+	}
+}