You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:49:00 UTC
[11/13] flink git commit: [FLINK-1350] [runtime] Add blocking result
partition variant
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 8daee7c..96b2f55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -22,13 +22,14 @@ import akka.actor.ActorRef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -92,9 +93,8 @@ public class RuntimeEnvironment implements Environment, Runnable {
private final AtomicBoolean canceled = new AtomicBoolean();
- private final IntermediateResultPartition[] producedPartitions;
-
- private final BufferWriter[] writers;
+ private final ResultPartition[] producedPartitions;
+ private final ResultPartitionWriter[] writers;
private final SingleInputGate[] inputGates;
@@ -116,23 +116,27 @@ public class RuntimeEnvironment implements Environment, Runnable {
try {
// Produced intermediate result partitions
- final List<PartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
+ final List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
- this.producedPartitions = new IntermediateResultPartition[partitions.size()];
- this.writers = new BufferWriter[partitions.size()];
+ this.producedPartitions = new ResultPartition[partitions.size()];
+ this.writers = new ResultPartitionWriter[partitions.size()];
for (int i = 0; i < this.producedPartitions.length; i++) {
- this.producedPartitions[i] = IntermediateResultPartition.create(this, i, owner.getJobID(), owner.getExecutionId(), networkEnvironment, partitions.get(i));
- writers[i] = new BufferWriter(this.producedPartitions[i]);
+ ResultPartitionDeploymentDescriptor desc = partitions.get(i);
+ ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), owner.getExecutionId());
+
+ this.producedPartitions[i] = new ResultPartition(owner.getJobID(), partitionId, desc.getPartitionType(), desc.getNumberOfSubpartitions(), networkEnvironment, ioManager);
+
+ writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
}
// Consumed intermediate result partitions
- final List<PartitionConsumerDeploymentDescriptor> consumedPartitions = tdd.getConsumedPartitions();
+ final List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
this.inputGates = new SingleInputGate[consumedPartitions.size()];
for (int i = 0; i < inputGates.length; i++) {
- inputGates[i] = SingleInputGate.create(networkEnvironment, consumedPartitions.get(i));
+ inputGates[i] = SingleInputGate.create(consumedPartitions.get(i), networkEnvironment);
// The input gates are organized by key for task updates/channel updates at runtime
inputGatesById.put(inputGates[i].getConsumedResultId(), inputGates[i]);
@@ -211,7 +215,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
// Finish the produced partitions
if (producedPartitions != null) {
- for (IntermediateResultPartition partition : producedPartitions) {
+ for (ResultPartition partition : producedPartitions) {
if (partition != null) {
partition.finish();
}
@@ -340,14 +344,14 @@ public class RuntimeEnvironment implements Environment, Runnable {
}
@Override
- public BufferWriter getWriter(int index) {
+ public ResultPartitionWriter getWriter(int index) {
checkElementIndex(index, writers.length, "Illegal environment writer request.");
return writers[checkElementIndex(index, writers.length)];
}
@Override
- public BufferWriter[] getAllWriters() {
+ public ResultPartitionWriter[] getAllWriters() {
return writers;
}
@@ -363,7 +367,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
return inputGates;
}
- public IntermediateResultPartition[] getProducedPartitions() {
+ public ResultPartition[] getProducedPartitions() {
return producedPartitions;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cf24b20..eda5bdf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,23 +20,22 @@ package org.apache.flink.runtime.executiongraph;
import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
-import static akka.dispatch.Futures.future;
-
import akka.dispatch.OnFailure;
import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.deployment.PartialPartitionInfo;
-import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.io.network.RemoteAddress;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -44,24 +43,23 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
-
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import static com.google.common.base.Preconditions.checkArgument;
+import static akka.dispatch.Futures.future;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
@@ -71,6 +69,12 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.FailIntermediateResultPartitions;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTask;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.UpdateTaskSinglePartitionInfo;
+import static org.apache.flink.runtime.messages.TaskManagerMessages.createUpdateTaskMultiplePartitionInfos;
/**
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -113,7 +117,7 @@ public class Execution implements Serializable {
private final FiniteDuration timeout;
- private ConcurrentLinkedQueue<PartialPartitionInfo> partialPartitionInfos;
+ private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
private volatile ExecutionState state = CREATED;
@@ -128,8 +132,6 @@ public class Execution implements Serializable {
// --------------------------------------------------------------------------------------------
public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) {
- checkArgument(attemptNumber >= 0);
-
this.vertex = checkNotNull(vertex);
this.attemptId = new ExecutionAttemptID();
@@ -140,17 +142,17 @@ public class Execution implements Serializable {
this.timeout = timeout;
- this.partialPartitionInfos = new ConcurrentLinkedQueue<PartialPartitionInfo>();
+ this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor>();
}
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
-
+
public ExecutionVertex getVertex() {
return vertex;
}
-
+
public ExecutionAttemptID getAttemptId() {
return attemptId;
}
@@ -158,15 +160,15 @@ public class Execution implements Serializable {
public int getAttemptNumber() {
return attemptNumber;
}
-
+
public ExecutionState getState() {
return state;
}
-
+
public SimpleSlot getAssignedResource() {
return assignedResource;
}
-
+
public InstanceConnectionInfo getAssignedResourceLocation() {
return assignedResourceLocation;
}
@@ -196,8 +198,8 @@ public class Execution implements Serializable {
}
assignedResource = null;
- partialPartitionInfos.clear();
- partialPartitionInfos = null;
+ partialInputChannelDeploymentDescriptors.clear();
+ partialInputChannelDeploymentDescriptors = null;
}
// --------------------------------------------------------------------------------------------
@@ -226,7 +228,7 @@ public class Execution implements Serializable {
// sanity check
if (locationConstraint != null && sharingGroup == null) {
- throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing not allowed.");
+ throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
}
if (transitionState(CREATED, SCHEDULED)) {
@@ -328,7 +330,7 @@ public class Execution implements Serializable {
Instance instance = slot.getInstance();
Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
- new TaskManagerMessages.SubmitTask(deployment), new Timeout(timeout));
+ new SubmitTask(deployment), new Timeout(timeout));
deployAction.onComplete(new OnComplete<Object>(){
@@ -432,22 +434,38 @@ public class Execution implements Serializable {
}
}
- // TODO This leads to many unnecessary RPC calls in most cases
- void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> consumers) {
- if (consumers.size() != 1) {
- fail(new IllegalStateException("Only one consumer is supported currently."));
+ void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
+ if (allConsumers.size() != 1) {
+ fail(new IllegalStateException("Currently, only a single consumer group per partition is supported."));
}
- final List<ExecutionEdge> consumer = consumers.get(0);
-
- for (ExecutionEdge edge : consumer) {
+ for (ExecutionEdge edge : allConsumers.get(0)) {
final ExecutionVertex consumerVertex = edge.getTarget();
- final ExecutionState consumerState = consumerVertex.getExecutionState();
+ final Execution consumer = consumerVertex.getCurrentExecutionAttempt();
+ final ExecutionState consumerState = consumer.getState();
- if (consumerState == CREATED) {
- consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge));
+ final IntermediateResultPartition partition = edge.getSource();
+ // ----------------------------------------------------------------
+ // Consumer is created => try to deploy and cache input channel
+ // descriptors if there is a deployment race
+ // ----------------------------------------------------------------
+ if (consumerState == CREATED) {
+ final Execution partitionExecution = partition.getProducer()
+ .getCurrentExecutionAttempt();
+
+ consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
+ partition, partitionExecution));
+
+ // When deploying a consuming task, its task deployment descriptor will contain all
+ // deployment information available at the respective time. It is possible that some
+ // of the partitions to be consumed have not been created yet. These are updated
+ // runtime via the update messages.
+ //
+ // TODO The current approach may send many update messages even though the consuming
+ // task has already been deployed with all necessary information. We have to check
+ // whether this is a problem and fix it, if it is.
future(new Callable<Boolean>(){
@Override
public Boolean call() throws Exception {
@@ -468,41 +486,64 @@ public class Execution implements Serializable {
consumerVertex.sendPartitionInfos();
}
}
- else if (consumerState == RUNNING) {
- SimpleSlot consumerSlot = consumerVertex.getCurrentAssignedResource();
- ExecutionAttemptID consumerExecutionId = consumerVertex.
- getCurrentExecutionAttempt().getAttemptId();
-
- IntermediateResultPartitionID partitionID = edge.getSource().getPartitionId();
- int connectionIndex = edge.getSource().getIntermediateResult().getConnectionIndex();
-
- PartitionInfo.PartitionLocation producerLocation;
- RemoteAddress producerAddress = null;
-
- if(consumerSlot.getInstance().getInstanceConnectionInfo().equals(
- getAssignedResourceLocation())) {
- producerLocation = PartitionInfo.PartitionLocation.LOCAL;
- } else {
- producerLocation = PartitionInfo.PartitionLocation.REMOTE;
- producerAddress = new RemoteAddress(getAssignedResourceLocation(),
- connectionIndex);
- }
+ // ----------------------------------------------------------------
+ // Consumer is running => send update message now
+ // ----------------------------------------------------------------
+ else {
+ if (consumerState == RUNNING) {
+ final SimpleSlot consumerSlot = consumer.getAssignedResource();
- PartitionInfo partitionInfo = new PartitionInfo(partitionID, attemptId,
- producerLocation, producerAddress);
+ if (consumerSlot == null) {
+ // The consumer has been reset concurrently
+ continue;
+ }
- TaskManagerMessages.UpdateTask updateTaskMessage =
- new TaskManagerMessages.UpdateTaskSinglePartitionInfo(consumerExecutionId,
- edge.getSource().getIntermediateResult().getId(), partitionInfo);
+ final Instance consumerInstance = consumerSlot.getInstance();
- sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
- }
- else if (consumerState == SCHEDULED || consumerState == DEPLOYING) {
- consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge));
+ final ResultPartitionID partitionId = new ResultPartitionID(
+ partition.getPartitionId(), attemptId);
- // double check to resolve race conditions
- if(consumerVertex.getExecutionState() == RUNNING){
- consumerVertex.sendPartitionInfos();
+ final Instance partitionInstance = partition.getProducer()
+ .getCurrentAssignedResource().getInstance();
+
+ final ResultPartitionLocation partitionLocation;
+
+ if (consumerInstance.equals(partitionInstance)) {
+ // Consuming task is deployed to the same instance as the partition => local
+ partitionLocation = ResultPartitionLocation.createLocal();
+ }
+ else {
+ // Different instances => remote
+ final ConnectionID connectionId = new ConnectionID(
+ partitionInstance.getInstanceConnectionInfo(),
+ partition.getIntermediateResult().getConnectionIndex());
+
+ partitionLocation = ResultPartitionLocation.createRemote(connectionId);
+ }
+
+ final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor(
+ partitionId, partitionLocation);
+
+ final UpdateTask updateTaskMessage = new UpdateTaskSinglePartitionInfo(
+ consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
+
+ sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage);
+ }
+ // ----------------------------------------------------------------
+ // Consumer is scheduled or deploying => cache input channel
+ // deployment descriptors and send update message later
+ // ----------------------------------------------------------------
+ else if (consumerState == SCHEDULED || consumerState == DEPLOYING) {
+ final Execution partitionExecution = partition.getProducer()
+ .getCurrentExecutionAttempt();
+
+ consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor
+ .fromEdge(partition, partitionExecution));
+
+ // double check to resolve race conditions
+ if (consumerVertex.getExecutionState() == RUNNING) {
+ consumerVertex.sendPartitionInfos();
+ }
}
}
}
@@ -543,6 +584,23 @@ public class Execution implements Serializable {
if (transitionState(current, FINISHED)) {
try {
+ if (getVertex().finishAllBlockingPartitions()) {
+
+ IntermediateResult[] allResults = getVertex().getJobVertex()
+ .getProducedDataSets();
+
+ LOG.debug("Finished all produced partitions ({}). Scheduling all receivers " +
+ "of the following datasets: {}.", this, Arrays
+ .toString(allResults));
+
+ // Schedule next batch
+ for (IntermediateResult result : allResults) {
+ for (IntermediateResultPartition p : result.getPartitions()) {
+ scheduleOrUpdateConsumers(p.getConsumers());
+ }
+ }
+ }
+
assignedResource.releaseSlot();
vertex.getExecutionGraph().deregisterExecution(this);
}
@@ -612,28 +670,28 @@ public class Execution implements Serializable {
}
}
- void cachePartitionInfo(PartialPartitionInfo partitionInfo) {
- partialPartitionInfos.add(partitionInfo);
+ void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
+ partialInputChannelDeploymentDescriptors.add(partitionInfo);
}
void sendPartitionInfos() {
// check if the ExecutionVertex has already been archived and thus cleared the
// partial partition infos queue
- if(partialPartitionInfos != null && !partialPartitionInfos.isEmpty()) {
+ if(partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) {
- PartialPartitionInfo partialPartitionInfo;
+ PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor;
List<IntermediateDataSetID> resultIDs = new ArrayList<IntermediateDataSetID>();
- List<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
+ List<InputChannelDeploymentDescriptor> inputChannelDeploymentDescriptors = new ArrayList<InputChannelDeploymentDescriptor>();
- while ((partialPartitionInfo = partialPartitionInfos.poll()) != null) {
- resultIDs.add(partialPartitionInfo.getIntermediateDataSetID());
- partitionInfos.add(partialPartitionInfo.createPartitionInfo(this));
+ while ((partialInputChannelDeploymentDescriptor = partialInputChannelDeploymentDescriptors.poll()) != null) {
+ resultIDs.add(partialInputChannelDeploymentDescriptor.getResultId());
+ inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this));
}
- TaskManagerMessages.UpdateTask updateTaskMessage =
- TaskManagerMessages.createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
- partitionInfos);
+ UpdateTask updateTaskMessage =
+ createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs,
+ inputChannelDeploymentDescriptors);
sendUpdateTaskRpcCall(assignedResource, updateTaskMessage);
}
@@ -752,7 +810,7 @@ public class Execution implements Serializable {
if (slot != null) {
Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
- TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
+ CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
AkkaUtils.globalExecutionContext(), timeout);
cancelResult.onComplete(new OnComplete<Object>() {
@@ -782,7 +840,7 @@ public class Execution implements Serializable {
if (instance.isAlive()) {
try {
// TODO For some tests this could be a problem when querying too early if all resources were released
- instance.getTaskManager().tell(new TaskManagerMessages.FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
+ instance.getTaskManager().tell(new FailIntermediateResultPartitions(attemptId), ActorRef.noSender());
} catch (Throwable t) {
fail(new Exception("Intermediate result partition could not be failed.", t));
}
@@ -791,7 +849,8 @@ public class Execution implements Serializable {
}
private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot,
- final TaskManagerMessages.UpdateTask updateTaskMsg) {
+ final UpdateTask updateTaskMsg) {
+
if (consumerSlot != null) {
final Instance instance = consumerSlot.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
index 0d81fde..3fb3825 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -43,4 +43,9 @@ public class ExecutionEdge {
public int getInputNum() {
return inputNum;
}
+
+ @Override
+ public String toString() {
+ return "ExecutionEdge [" + source + " <=> " + target + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 81f83e6..01495f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobID;
@@ -175,17 +176,14 @@ public class ExecutionGraph implements Serializable {
* Once this value has reached the number of vertices, the job is done. */
private int nextVertexToFinish;
-
private ActorContext parentContext;
private ActorRef stateMonitorActor;
-
+
private boolean monitoringEnabled;
-
- private long monitoringInterval = 10000;
+ private long monitoringInterval = 10000;
-
public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>());
}
@@ -446,6 +444,7 @@ public class ExecutionGraph implements Serializable {
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
+
break;
case BACKTRACKING:
@@ -623,24 +622,24 @@ public class ExecutionGraph implements Serializable {
public void loadOperatorStates(Map<Tuple3<JobVertexID, Integer, Long> , StateHandle> states) {
synchronized (this.progressLock) {
- for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, StateHandle> state : states.entrySet()) {
+ for (Map.Entry<Tuple3<JobVertexID, Integer, Long>, StateHandle> state : states.entrySet())
tasks.get(state.getKey()._1()).getTaskVertices()[state.getKey()._2()].setOperatorState(state.getValue());
- }
}
}
- public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) {
- Execution execution = currentExecutions.get(executionId);
+ public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
+
+ final Execution execution = currentExecutions.get(partitionId.getProducerId());
if (execution == null) {
fail(new IllegalStateException("Cannot find execution for execution ID " +
- executionId));
+ partitionId.getPartitionId()));
}
- else if(execution.getVertex() == null){
- fail(new IllegalStateException("Execution with execution ID " + executionId +
- " has no vertex assigned."));
+ else if (execution.getVertex() == null){
+ fail(new IllegalStateException("Execution with execution ID " +
+ partitionId.getPartitionId() + " has no vertex assigned."));
} else {
- execution.getVertex().scheduleOrUpdateConsumers(partitionIndex);
+ execution.getVertex().scheduleOrUpdateConsumers(partitionId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 0444e5d..6fdc628 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,13 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -39,13 +32,19 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.slf4j.Logger;
-
import scala.concurrent.duration.FiniteDuration;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
public class ExecutionJobVertex implements Serializable {
@@ -82,7 +81,6 @@ public class ExecutionJobVertex implements Serializable {
private InputSplitAssigner splitAssigner;
-
public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex,
int defaultParallelism, FiniteDuration timeout) throws JobException {
this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
@@ -118,11 +116,14 @@ public class ExecutionJobVertex implements Serializable {
// create the intermediate results
this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
+
for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
- IntermediateDataSet set = jobVertex.getProducedDataSets().get(i);
- this.producedDataSets[i] = new IntermediateResult(set.getId(), this, numTaskVertices);
+ final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
+
+ this.producedDataSets[i] = new IntermediateResult(
+ result.getId(), this, numTaskVertices, result.getResultType());
}
-
+
// create all task vertices
for (int i = 0; i < numTaskVertices; i++) {
ExecutionVertex vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp);
@@ -374,6 +375,11 @@ public class ExecutionJobVertex implements Serializable {
catch (Throwable t) {
throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
}
+
+ // Reset intermediate results
+ for (IntermediateResult result : producedDataSets) {
+ result.resetForNewExecution();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 794ca21..e5d9db8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,17 +20,19 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.deployment.PartialPartitionInfo;
-import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -46,11 +48,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import static com.google.common.base.Preconditions.checkElementIndex;
import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
@@ -63,35 +66,35 @@ public class ExecutionVertex implements Serializable {
private static final long serialVersionUID = 42L;
+ @SuppressWarnings("unused")
private static final Logger LOG = ExecutionGraph.LOG;
-
+
private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
-
+
// --------------------------------------------------------------------------------------------
-
+
private final ExecutionJobVertex jobVertex;
-
- private IntermediateResultPartition[] resultPartitions;
-
+
+ private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
+
private ExecutionEdge[][] inputEdges;
private final int subTaskIndex;
-
+
private final List<Execution> priorExecutions;
private final FiniteDuration timeout;
-
+
private volatile CoLocationConstraint locationConstraint;
-
+
private volatile Execution currentExecution; // this field must never be null
-
-
+
private volatile List<Instance> locationConstraintInstances;
-
+
private volatile boolean scheduleLocalOnly;
-
+
private StateHandle operatorState;
-
+
// --------------------------------------------------------------------------------------------
public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
@@ -105,11 +108,13 @@ public class ExecutionVertex implements Serializable {
this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
- this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
- for (int i = 0; i < producedDataSets.length; i++) {
- IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
- this.resultPartitions[i] = irp;
- producedDataSets[i].setPartition(subTaskIndex, irp);
+ this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1);
+
+ for (IntermediateResult result : producedDataSets) {
+ IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex);
+ result.setPartition(subTaskIndex, irp);
+
+ resultPartitions.put(irp.getPartitionId(), irp);
}
this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
@@ -129,71 +134,71 @@ public class ExecutionVertex implements Serializable {
this.timeout = timeout;
}
-
-
+
+
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
-
+
public JobID getJobId() {
return this.jobVertex.getJobId();
}
-
+
public ExecutionJobVertex getJobVertex() {
return jobVertex;
}
-
+
public JobVertexID getJobvertexId() {
return this.jobVertex.getJobVertexId();
}
-
+
public String getTaskName() {
return this.jobVertex.getJobVertex().getName();
}
-
+
public int getTotalNumberOfParallelSubtasks() {
return this.jobVertex.getParallelism();
}
-
+
public int getParallelSubtaskIndex() {
return this.subTaskIndex;
}
-
+
public int getNumberOfInputs() {
return this.inputEdges.length;
}
-
+
public ExecutionEdge[] getInputEdges(int input) {
if (input < 0 || input >= this.inputEdges.length) {
throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
}
return inputEdges[input];
}
-
+
public CoLocationConstraint getLocationConstraint() {
return locationConstraint;
}
-
+
public Execution getCurrentExecutionAttempt() {
return currentExecution;
}
-
+
public ExecutionState getExecutionState() {
return currentExecution.getState();
}
-
+
public long getStateTimestamp(ExecutionState state) {
return currentExecution.getStateTimestamp(state);
}
-
+
public Throwable getFailureCause() {
return currentExecution.getFailureCause();
}
-
+
public SimpleSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource();
}
-
+
public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
}
@@ -213,14 +218,14 @@ public class ExecutionVertex implements Serializable {
// --------------------------------------------------------------------------------------------
// Graph building
// --------------------------------------------------------------------------------------------
-
+
public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
-
+
final DistributionPattern pattern = edge.getDistributionPattern();
final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
-
+
ExecutionEdge[] edges;
-
+
switch (pattern) {
case POINTWISE:
edges = connectPointwise(sourcePartitions, inputNumber);
@@ -229,14 +234,14 @@ public class ExecutionVertex implements Serializable {
case ALL_TO_ALL:
edges = connectAllToAll(sourcePartitions, inputNumber);
break;
-
+
default:
throw new RuntimeException("Unrecognized distribution pattern.");
-
+
}
-
+
this.inputEdges[inputNumber] = edges;
-
+
// add the consumers to the source
// for now (until the receiver initiated handshake is in place), we need to register the
// edges as the execution graph
@@ -244,22 +249,22 @@ public class ExecutionVertex implements Serializable {
ee.getSource().addConsumer(ee, consumerNumber);
}
}
-
+
private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
-
+
for (int i = 0; i < sourcePartitions.length; i++) {
IntermediateResultPartition irp = sourcePartitions[i];
edges[i] = new ExecutionEdge(irp, this, inputNumber);
}
-
+
return edges;
}
-
+
private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
final int numSources = sourcePartitions.length;
final int parallelism = getTotalNumberOfParallelSubtasks();
-
+
// simple case same number of sources as targets
if (numSources == parallelism) {
return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
@@ -300,7 +305,7 @@ public class ExecutionVertex implements Serializable {
int start = (int) (subTaskIndex * factor);
int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
- sourcePartitions.length :
+ sourcePartitions.length :
(int) ((subTaskIndex + 1) * factor);
ExecutionEdge[] edges = new ExecutionEdge[end - start];
@@ -312,29 +317,29 @@ public class ExecutionVertex implements Serializable {
}
}
}
-
+
public void setLocationConstraintHosts(List<Instance> instances) {
this.locationConstraintInstances = instances;
}
-
+
public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
}
-
+
this.scheduleLocalOnly = scheduleLocalOnly;
}
public boolean isScheduleLocalOnly() {
return scheduleLocalOnly;
}
-
+
/**
* Gets the location preferences of this task, determined by the locations of the predecessors from which
* it receives input data.
* If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
* method returns {@code null} to indicate no location preference.
- *
+ *
* @return The preferred locations for this vertex execution, or null, if there is no preference.
*/
public Iterable<Instance> getPreferredLocations() {
@@ -343,13 +348,12 @@ public class ExecutionVertex implements Serializable {
if (constraintInstances != null && !constraintInstances.isEmpty()) {
return constraintInstances;
}
-
+
// otherwise, base the preferred locations on the input connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
-
Set<Instance> locations = new HashSet<Instance>();
Set<Instance> inputLocations = new HashSet<Instance>();
@@ -398,11 +402,11 @@ public class ExecutionVertex implements Serializable {
Execution execution = currentExecution;
ExecutionState state = execution.getState();
- if (state == FINISHED || state == CANCELED || state ==FAILED) {
+ if (state == FINISHED || state == CANCELED || state == FAILED) {
priorExecutions.add(execution);
currentExecution = new Execution(this, execution.getAttemptNumber()+1,
System.currentTimeMillis(), timeout);
-
+
CoLocationGroup grp = jobVertex.getCoLocationGroup();
if (grp != null) {
this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
@@ -436,17 +440,33 @@ public class ExecutionVertex implements Serializable {
}
/**
- * Schedules or updates the {@link IntermediateResultPartition} consumer
- * tasks of the intermediate result partition with the given index.
+ * Schedules or updates the consumer tasks of the result partition with the given ID.
*/
- void scheduleOrUpdateConsumers(int partitionIndex) {
- checkElementIndex(partitionIndex, resultPartitions.length);
+ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
+
+ final Execution execution = currentExecution;
- IntermediateResultPartition partition = resultPartitions[partitionIndex];
+ // Abort this request if there was a concurrent reset
+ if (!partitionId.getProducerId().equals(execution.getAttemptId())) {
+ return;
+ }
+
+ final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());
+
+ if (partition == null) {
+ throw new IllegalStateException("Unknown partition " + partitionId + ".");
+ }
- currentExecution.scheduleOrUpdateConsumers(partition.getConsumers());
+ if (partition.getIntermediateResult().getResultType().isPipelined()) {
+ // Schedule or update receivers of this partition
+ execution.scheduleOrUpdateConsumers(partition.getConsumers());
+ }
+ else {
+ throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
+ "pipelined partitions.");
+ }
}
-
+
/**
* This method cleans fields that are irrelevant for the archived execution attempt.
*/
@@ -458,22 +478,22 @@ public class ExecutionVertex implements Serializable {
if (!(state == FINISHED || state == CANCELED || state == FAILED)) {
throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
}
-
+
// prepare the current execution for archiving
execution.prepareForArchiving();
-
+
// prepare previous executions for archiving
for (Execution exec : priorExecutions) {
exec.prepareForArchiving();
}
-
+
// clear the unnecessary fields in this class
this.resultPartitions = null;
this.inputEdges = null;
this.locationConstraintInstances = null;
}
- public void cachePartitionInfo(PartialPartitionInfo partitionInfo){
+ public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo){
getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
}
@@ -481,6 +501,25 @@ public class ExecutionVertex implements Serializable {
currentExecution.sendPartitionInfos();
}
+ /**
+ * Returns whether to schedule the next batch of receiving tasks.
+ */
+ boolean finishAllBlockingPartitions() {
+ for (IntermediateResultPartition partition : resultPartitions.values()) {
+ // Nothing to do for pipelined results
+ if (partition.getResultType().isPipelined()) {
+ return false;
+ }
+ // It's a blocking partition, mark it as finished and return whether all blocking
+ // partitions have been produced.
+ else if (partition.markFinished()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
// --------------------------------------------------------------------------------------------
// Notifications from the Execution Attempt
// --------------------------------------------------------------------------------------------
@@ -507,20 +546,27 @@ public class ExecutionVertex implements Serializable {
void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
}
-
- TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, SimpleSlot slot) {
+
+ /**
+ * Creates a task deployment descriptor to deploy a subtask to the given target slot.
+ */
+ TaskDeploymentDescriptor createDeploymentDescriptor(
+ ExecutionAttemptID executionId,
+ SimpleSlot targetSlot) {
+
// Produced intermediate results
- List<PartitionDeploymentDescriptor> producedPartitions = new ArrayList<PartitionDeploymentDescriptor>(resultPartitions.length);
+ List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
- for (IntermediateResultPartition partition : resultPartitions) {
- producedPartitions.add(PartitionDeploymentDescriptor.fromIntermediateResultPartition(partition));
+ for (IntermediateResultPartition partition : resultPartitions.values()) {
+ producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition));
}
// Consumed intermediate results
- List<PartitionConsumerDeploymentDescriptor> consumedPartitions = new ArrayList<PartitionConsumerDeploymentDescriptor>();
+ List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<InputGateDeploymentDescriptor>();
for (ExecutionEdge[] edges : inputEdges) {
- PartitionInfo[] partitions = PartitionInfo.fromEdges(edges, slot);
+ InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor
+ .fromEdges(edges, targetSlot);
// If the produced partition has multiple consumers registered, we
// need to request the one matching our sub task index.
@@ -531,7 +577,7 @@ public class ExecutionVertex implements Serializable {
IntermediateDataSetID resultId = edges[0].getSource().getIntermediateResult().getId();
- consumedPartitions.add(new PartitionConsumerDeploymentDescriptor(resultId, partitions, queueToRequest));
+ consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
}
List<BlobKey> jarFiles = getExecutionGraph().getRequiredJarFiles();
@@ -539,7 +585,7 @@ public class ExecutionVertex implements Serializable {
return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
- producedPartitions, consumedPartitions, jarFiles, slot.getSlotNumber(), operatorState);
+ producedPartitions, consumedPartitions, jarFiles, targetSlot.getSlotNumber(), operatorState);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 64ad2d2..658a06b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -18,8 +18,13 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
public class IntermediateResult {
@@ -31,20 +36,30 @@ public class IntermediateResult {
private final int numParallelProducers;
+ private final AtomicInteger numberOfRunningProducers;
+
private int partitionsAssigned;
private int numConsumers;
private final int connectionIndex;
- private final IntermediateResultPartitionType resultType;
+ private final ResultPartitionType resultType;
+
+ public IntermediateResult(
+ IntermediateDataSetID id,
+ ExecutionJobVertex producer,
+ int numParallelProducers,
+ ResultPartitionType resultType) {
- public IntermediateResult(IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers) {
- this.id = id;
- this.producer = producer;
+ this.id = checkNotNull(id);
+ this.producer = checkNotNull(producer);
this.partitions = new IntermediateResultPartition[numParallelProducers];
+ checkArgument(numParallelProducers >= 1);
this.numParallelProducers = numParallelProducers;
+ this.numberOfRunningProducers = new AtomicInteger(numParallelProducers);
+
// we do not set the intermediate result partitions here, because we let them be initialized by
// the execution vertex that produces them
@@ -52,8 +67,7 @@ public class IntermediateResult {
this.connectionIndex = (int) (Math.random() * Integer.MAX_VALUE);
// The runtime type for this produced result
- // TODO The JobGraph generator has to decide which type of result this is
- this.resultType = IntermediateResultPartitionType.PIPELINED;
+ this.resultType = checkNotNull(resultType);
}
public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
@@ -85,7 +99,7 @@ public class IntermediateResult {
return partitionsAssigned;
}
- public IntermediateResultPartitionType getResultType() {
+ public ResultPartitionType getResultType() {
return resultType;
}
@@ -104,4 +118,26 @@ public class IntermediateResult {
public int getConnectionIndex() {
return connectionIndex;
}
+
+ void resetForNewExecution() {
+ this.numberOfRunningProducers.set(numParallelProducers);
+ }
+
+ int decrementNumberOfRunningProducersAndGetRemaining() {
+ return numberOfRunningProducers.decrementAndGet();
+ }
+
+ boolean isConsumable() {
+ if (resultType.isPipelined()) {
+ return true;
+ }
+ else {
+ return numberOfRunningProducers.get() == 0;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "IntermediateResult " + id.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 7d06dca..124ceb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import java.util.ArrayList;
@@ -59,10 +60,18 @@ public class IntermediateResultPartition {
return partitionId;
}
+ ResultPartitionType getResultType() {
+ return totalResult.getResultType();
+ }
+
public List<List<ExecutionEdge>> getConsumers() {
return consumers;
}
+ public boolean isConsumable() {
+ return totalResult.isConsumable();
+ }
+
int addConsumerGroup() {
int pos = consumers.size();
@@ -78,4 +87,24 @@ public class IntermediateResultPartition {
void addConsumer(ExecutionEdge edge, int consumerNumber) {
consumers.get(consumerNumber).add(edge);
}
+
+ boolean markFinished() {
+ // Sanity check that this is only called on blocking partitions.
+ if (!getResultType().isBlocking()) {
+ throw new IllegalStateException("Tried to mark a non-blocking result partition as finished");
+ }
+
+ final int refCnt = totalResult.decrementNumberOfRunningProducersAndGetRemaining();
+
+ if (refCnt == 0) {
+ return true;
+ }
+ else if (refCnt < 0) {
+ throw new IllegalStateException("Decremented number of unfinished producers below 0. "
+ + "This is most likely a bug in the execution state/intermediate result "
+ + "partition management.");
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
index 6007db9..87a3baa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
@@ -61,7 +61,7 @@ public class ChannelReaderInputViewIterator<E> implements MutableObjectIterator<
segments, freeMemTarget, accessors, numBlocks);
}
- public ChannelReaderInputViewIterator(BlockChannelReader reader, LinkedBlockingQueue<MemorySegment> returnQueue,
+ public ChannelReaderInputViewIterator(BlockChannelReader<MemorySegment> reader, LinkedBlockingQueue<MemorySegment> returnQueue,
List<MemorySegment> segments, List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
throws IOException
{
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
index 9fb8072..736c245 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -39,7 +39,7 @@ import org.apache.flink.runtime.util.MathUtils;
*/
public class FileChannelInputView extends AbstractPagedInputView {
- private final BlockChannelReader reader;
+ private final BlockChannelReader<MemorySegment> reader;
private final MemoryManager memManager;
@@ -53,7 +53,7 @@ public class FileChannelInputView extends AbstractPagedInputView {
// --------------------------------------------------------------------------------------------
- public FileChannelInputView(BlockChannelReader reader, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException {
+ public FileChannelInputView(BlockChannelReader<MemorySegment> reader, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException {
super(0);
checkNotNull(reader);
@@ -129,7 +129,7 @@ public class FileChannelInputView extends AbstractPagedInputView {
// get the next segment
numBlocksRemaining--;
- return reader.getNextReturnedSegment();
+ return reader.getNextReturnedBlock();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
index e04759c..b6c500f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.memorymanager.MemoryManager;
*/
public class FileChannelOutputView extends AbstractPagedOutputView {
- private final BlockChannelWriter writer; // the writer to the channel
+ private final BlockChannelWriter<MemorySegment> writer; // the writer to the channel
private final MemoryManager memManager;
@@ -47,7 +47,7 @@ public class FileChannelOutputView extends AbstractPagedOutputView {
// --------------------------------------------------------------------------------------------
- public FileChannelOutputView(BlockChannelWriter writer, MemoryManager memManager, List<MemorySegment> memory, int segmentSize) throws IOException {
+ public FileChannelOutputView(BlockChannelWriter<MemorySegment> writer, MemoryManager memManager, List<MemorySegment> memory, int segmentSize) throws IOException {
super(segmentSize, 0);
checkNotNull(writer);
@@ -137,7 +137,7 @@ public class FileChannelOutputView extends AbstractPagedOutputView {
if (current != null) {
writeSegment(current, posInSegment);
}
- return writer.getNextReturnedSegment();
+ return writer.getNextReturnedBlock();
}
private void writeSegment(MemorySegment segment, int writePosition) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
index 6098fdb..7d8d485 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.util.MathUtils;
*/
public class SeekableFileChannelInputView extends AbstractPagedInputView {
- private BlockChannelReader reader;
+ private BlockChannelReader<MemorySegment> reader;
private final IOManager ioManager;
@@ -127,7 +127,7 @@ public class SeekableFileChannelInputView extends AbstractPagedInputView {
}
numBlocksRemaining--;
- seekInput(reader.getNextReturnedSegment(), positionInBlock, numBlocksRemaining == 0 ? sizeOfLastBlock : segmentSize);
+ seekInput(reader.getNextReturnedBlock(), positionInBlock, numBlocksRemaining == 0 ? sizeOfLastBlock : segmentSize);
}
public void close() throws IOException {
@@ -169,7 +169,7 @@ public class SeekableFileChannelInputView extends AbstractPagedInputView {
// get the next segment
numBlocksRemaining--;
- return reader.getNextReturnedSegment();
+ return reader.getNextReturnedBlock();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
index 655a574..5f9c2cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SpillingBuffer.java
@@ -42,7 +42,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
private final MemorySegmentSource memorySource;
- private BlockChannelWriter writer;
+ private BlockChannelWriter<MemorySegment> writer;
private RandomAccessInputView inMemInView;
@@ -86,7 +86,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
this.writer.writeBlock(this.fullSegments.get(i));
}
this.fullSegments.clear();
- final MemorySegment seg = this.writer.getNextReturnedSegment();
+ final MemorySegment seg = this.writer.getNextReturnedBlock();
this.numMemorySegmentsInWriter--;
return seg;
}
@@ -94,7 +94,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
// spilling
this.writer.writeBlock(current);
this.blockCount++;
- return this.writer.getNextReturnedSegment();
+ return this.writer.getNextReturnedBlock();
}
}
@@ -116,7 +116,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
this.blockCount++;
this.writer.close();
for (int i = this.numMemorySegmentsInWriter; i > 0; i--) {
- this.fullSegments.add(this.writer.getNextReturnedSegment());
+ this.fullSegments.add(this.writer.getNextReturnedBlock());
}
this.numMemorySegmentsInWriter = 0;
}
@@ -135,7 +135,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
this.externalInView.close();
}
- final BlockChannelReader reader = this.ioManager.createBlockChannelReader(this.writer.getChannelID());
+ final BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(this.writer.getChannelID());
this.externalInView = new HeaderlessChannelReaderInputView(reader, this.fullSegments, this.blockCount, this.numBytesInLastSegment, false);
return this.externalInView;
}
@@ -161,7 +161,7 @@ public class SpillingBuffer extends AbstractPagedOutputView {
// closing before the first flip, collect the memory in the writer
this.writer.close();
for (int i = this.numMemorySegmentsInWriter; i > 0; i--) {
- segments.add(this.writer.getNextReturnedSegment());
+ segments.add(this.writer.getNextReturnedBlock());
}
this.writer.closeAndDelete();
this.writer = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
index 3991167..e79439f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -80,7 +80,7 @@ public abstract class AbstractFileIOChannel implements FileIOChannel {
@Override
public abstract void close() throws IOException;
-
+
@Override
public void deleteChannel() {
if (!isClosed() || this.fileChannel.isOpen()) {
@@ -104,4 +104,9 @@ public abstract class AbstractFileIOChannel implements FileIOChannel {
deleteChannel();
}
}
+
+ @Override
+ public FileChannel getNioFileChannel() {
+ return fileChannel;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
index acfa71f..7a80b7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.io.disk.iomanager;
+import org.apache.flink.core.memory.MemorySegment;
+
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.flink.core.memory.MemorySegment;
-
/**
* A reader that reads data in blocks from a file channel. The reader reads the blocks into a
* {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
@@ -40,7 +40,7 @@ import org.apache.flink.core.memory.MemorySegment;
* or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
* writing and reading is consistent with each other (same blocks sizes) is up to the programmer.
*/
-public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySegment, ReadRequest> implements BlockChannelReader {
+public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySegment, ReadRequest> implements BlockChannelReader<MemorySegment> {
private final LinkedBlockingQueue<MemorySegment> returnSegments;
@@ -57,7 +57,7 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
LinkedBlockingQueue<MemorySegment> returnSegments)
throws IOException
{
- super(channelID, requestQueue, new QueuingCallback(returnSegments), false);
+ super(channelID, requestQueue, new QueuingCallback<MemorySegment>(returnSegments), false);
this.returnSegments = returnSegments;
}
@@ -74,7 +74,12 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
public void readBlock(MemorySegment segment) throws IOException {
addRequest(new SegmentReadRequest(this, segment));
}
-
+
+ @Override
+ public void seekToPosition(long position) throws IOException {
+ requestQueue.add(new SeekRequest(this, position));
+ }
+
/**
* Gets the next memory segment that has been filled with data by the reader. This method blocks until
* such a segment is available, or until an error occurs in the reader, or the reader is closed.
@@ -87,7 +92,7 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
* @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
*/
@Override
- public MemorySegment getNextReturnedSegment() throws IOException {
+ public MemorySegment getNextReturnedBlock() throws IOException {
try {
while (true) {
final MemorySegment next = this.returnSegments.poll(1000, TimeUnit.MILLISECONDS);
@@ -115,9 +120,4 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
return this.returnSegments;
}
-
- @Override
- public void seekToPosition(long position) throws IOException {
- this.requestQueue.add(new SeekRequest(this, position));
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
index 7e1681f..18d16a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
-public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter {
+public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter<MemorySegment> {
private final LinkedBlockingQueue<MemorySegment> returnSegments;
@@ -41,7 +41,7 @@ public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback
LinkedBlockingQueue<MemorySegment> returnSegments)
throws IOException
{
- super(channelID, requestQueue, new QueuingCallback(returnSegments));
+ super(channelID, requestQueue, new QueuingCallback<MemorySegment>(returnSegments));
this.returnSegments = returnSegments;
}
@@ -58,7 +58,7 @@ public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback
* @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
*/
@Override
- public MemorySegment getNextReturnedSegment() throws IOException {
+ public MemorySegment getNextReturnedBlock() throws IOException {
try {
while (true) {
final MemorySegment next = returnSegments.poll(1000, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
index c9fbdd2..e5dab47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
* An asynchronous implementation of the {@link BlockChannelWriterWithCallback} that queues I/O requests
* and calls a callback once they have been handled.
*/
-public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<MemorySegment, WriteRequest> implements BlockChannelWriterWithCallback {
+public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<MemorySegment, WriteRequest> implements BlockChannelWriterWithCallback<MemorySegment> {
/**
* Creates a new asynchronous block writer for the given channel.
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java
new file mode 100644
index 0000000..ba5f0ec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsynchronousBufferFileReader extends AsynchronousFileIOChannel<Buffer, ReadRequest> implements BufferFileReader {
+
+ private final AtomicBoolean hasReachedEndOfFile = new AtomicBoolean();
+
+ protected AsynchronousBufferFileReader(ID channelID, RequestQueue<ReadRequest> requestQueue, RequestDoneCallback<Buffer> callback) throws IOException {
+ super(channelID, requestQueue, callback, false);
+ }
+
+ @Override
+ public void readInto(Buffer buffer) throws IOException {
+ addRequest(new BufferReadRequest(this, buffer, hasReachedEndOfFile));
+ }
+
+ @Override
+ public void seekToPosition(long position) throws IOException {
+ requestQueue.add(new SeekRequest(this, position));
+ }
+
+ @Override
+ public boolean hasReachedEndOfFile() {
+ return hasReachedEndOfFile.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java
new file mode 100644
index 0000000..c2a277a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileSegmentReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class AsynchronousBufferFileSegmentReader extends AsynchronousFileIOChannel<FileSegment, ReadRequest> implements BufferFileSegmentReader {
+
+ private final AtomicBoolean hasReachedEndOfFile = new AtomicBoolean();
+
+ protected AsynchronousBufferFileSegmentReader(ID channelID, RequestQueue<ReadRequest> requestQueue, RequestDoneCallback<FileSegment> callback) throws IOException {
+ super(channelID, requestQueue, callback, false);
+ }
+
+ @Override
+ public void read() throws IOException {
+ addRequest(new FileSegmentReadRequest(this, hasReachedEndOfFile));
+ }
+
+ @Override
+ public void seekTo(long position) throws IOException {
+ requestQueue.add(new SeekRequest(this, position));
+ }
+
+ @Override
+ public boolean hasReachedEndOfFile() {
+ return hasReachedEndOfFile.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
new file mode 100644
index 0000000..14bb8f7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+
+public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buffer, WriteRequest> implements BufferFileWriter {
+
+ private static final RecyclingCallback CALLBACK = new RecyclingCallback();
+
+ protected AsynchronousBufferFileWriter(ID channelID, RequestQueue<WriteRequest> requestQueue) throws IOException {
+ super(channelID, requestQueue, CALLBACK, true);
+ }
+
+ @Override
+ public void writeBlock(Buffer buffer) throws IOException {
+ addRequest(new BufferWriteRequest(this, buffer));
+ }
+
+ @Override
+ public int getNumberOfOutstandingRequests() {
+ return requestsNotReturned.get();
+ }
+
+ @Override
+ public boolean registerAllRequestsProcessedListener(NotificationListener listener) throws IOException {
+ return super.registerAllRequestsProcessedListener(listener);
+ }
+
+ /**
+ * Recycles the buffer after the I/O request.
+ */
+ private static class RecyclingCallback implements RequestDoneCallback<Buffer> {
+
+ @Override
+ public void requestSuccessful(Buffer buffer) {
+ buffer.recycle();
+ }
+
+ @Override
+ public void requestFailed(Buffer buffer, IOException e) {
+ buffer.recycle();
+ }
+ }
+}