You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:02 UTC
[38/63] [abbrv] Finalize ExecutionGraph state machine and calls
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index adc0f09..bbc0c97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -16,992 +16,388 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.executiongraph;
-import java.io.IOException;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
+
+import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.ExecutionStateTransition;
-import org.apache.flink.runtime.instance.AllocatedResource;
-import org.apache.flink.runtime.instance.AllocationID;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.taskmanager.TaskOperationResult;
-import org.apache.flink.runtime.taskmanager.TaskCancelResult;
-import org.apache.flink.runtime.taskmanager.TaskKillResult;
-import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
-import org.apache.flink.runtime.taskmanager.TaskOperationResult.ReturnCode;
-import org.apache.flink.runtime.util.AtomicEnum;
-import org.apache.flink.runtime.util.SerializableArrayList;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
/**
- * An execution vertex represents an instance of a task in a Nephele job. An execution vertex
- * is initially created from a job vertex and always belongs to exactly one group vertex.
- * It is possible to duplicate execution vertices in order to distribute a task to several different
- * task managers and process the task in parallel.
- * <p>
- * This class is thread-safe.
- *
+ * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
+ * which time it spawns an {@link Execution}.
*/
-public final class ExecutionVertex {
-
- /**
- * The log object used for debugging.
- */
- private static final Logger LOG = LoggerFactory.getLogger(ExecutionVertex.class);
-
- /**
- * The ID of the vertex.
- */
- private final ExecutionVertexID vertexID;
-
- /**
- * The group vertex this vertex belongs to.
- */
- private final ExecutionGroupVertex groupVertex;
-
- /**
- * The execution graph is vertex belongs to.
- */
- private final ExecutionGraph executionGraph;
-
- /**
- * The allocated resources assigned to this vertex.
- */
- private final AtomicReference<AllocatedResource> allocatedResource = new AtomicReference<AllocatedResource>(null);
-
- /**
- * The allocation ID identifying the allocated resources used by this vertex
- * within the instance.
- */
- private volatile AllocationID allocationID = null;
-
- /**
- * A list of {@link VertexAssignmentListener} objects to be notified about changes in the instance assignment.
- */
- private final CopyOnWriteArrayList<VertexAssignmentListener> vertexAssignmentListeners = new CopyOnWriteArrayList<VertexAssignmentListener>();
-
- /**
- * A map of {@link ExecutionListener} objects to be notified about the state changes of a vertex.
- */
- private final ConcurrentMap<Integer, ExecutionListener> executionListeners = new ConcurrentSkipListMap<Integer, ExecutionListener>();
-
- /**
- * The current execution state of the task represented by this vertex
- */
- private final AtomicEnum<ExecutionState> executionState = new AtomicEnum<ExecutionState>(ExecutionState.CREATED);
-
- /**
- * The output gates attached to this vertex.
- */
- private final ExecutionGate[] outputGates;
-
- /**
- * The input gates attached to his vertex.
- */
- private final ExecutionGate[] inputGates;
-
- /**
- * The index of this vertex in the vertex group.
- */
- private volatile int indexInVertexGroup = 0;
-
- /**
- * Stores the number of times the vertex may be still be started before the corresponding task is considered to be
- * failed.
- */
- private final AtomicInteger retriesLeft;
-
-
- /**
- * The execution pipeline this vertex is part of.
- */
- private final AtomicReference<ExecutionPipeline> executionPipeline = new AtomicReference<ExecutionPipeline>(null);
-
- /**
- * Flag to indicate whether the vertex has been requested to cancel while in state STARTING
- */
- private final AtomicBoolean cancelRequested = new AtomicBoolean(false);
-
- /**
- * Create a new execution vertex and instantiates its environment.
- *
- * @param executionGraph
- * the execution graph the new vertex belongs to
- * @param groupVertex
- * the group vertex the new vertex belongs to
- * @param numberOfOutputGates
- * the number of output gates attached to this vertex
- * @param numberOfInputGates
- * the number of input gates attached to this vertex
- */
- public ExecutionVertex(final ExecutionGraph executionGraph, final ExecutionGroupVertex groupVertex,
- final int numberOfOutputGates, final int numberOfInputGates) {
- this(new ExecutionVertexID(), executionGraph, groupVertex, numberOfOutputGates, numberOfInputGates);
-
- this.groupVertex.addInitialSubtask(this);
- }
-
- /**
- * Private constructor used to duplicate execution vertices.
- *
- * @param vertexID
- * the ID of the new execution vertex.
- * @param executionGraph
- * the execution graph the new vertex belongs to
- * @param groupVertex
- * the group vertex the new vertex belongs to
- * @param numberOfOutputGates
- * the number of output gates attached to this vertex
- * @param numberOfInputGates
- * the number of input gates attached to this vertex
- */
- private ExecutionVertex(final ExecutionVertexID vertexID, final ExecutionGraph executionGraph,
- final ExecutionGroupVertex groupVertex, final int numberOfOutputGates, final int numberOfInputGates) {
-
- this.vertexID = vertexID;
- this.executionGraph = executionGraph;
- this.groupVertex = groupVertex;
-
- this.retriesLeft = new AtomicInteger(groupVertex.getNumberOfExecutionRetries());
-
- this.outputGates = new ExecutionGate[numberOfOutputGates];
- this.inputGates = new ExecutionGate[numberOfInputGates];
+public class ExecutionVertex {
- // Register vertex with execution graph
- this.executionGraph.registerExecutionVertex(this);
-
- // Register the vertex itself as a listener for state changes
- registerExecutionListener(this.executionGraph);
- }
-
- /**
- * Returns the group vertex this execution vertex belongs to.
- *
- * @return the group vertex this execution vertex belongs to
- */
- public ExecutionGroupVertex getGroupVertex() {
- return this.groupVertex;
- }
+ @SuppressWarnings("unused")
+ private static final Logger LOG = ExecutionGraph.LOG;
+
+ private static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
+
+ // --------------------------------------------------------------------------------------------
+
+ private final ExecutionJobVertex jobVertex;
+
+ private final IntermediateResultPartition[] resultPartitions;
+
+ private final ExecutionEdge[][] inputEdges;
+
+ private final int subTaskIndex;
+
+ private final List<Execution> priorExecutions;
+
+ private volatile Execution currentExecution; // this field must never be null
+
+ // --------------------------------------------------------------------------------------------
- /**
- * Returns the name of the execution vertex.
- *
- * @return the name of the execution vertex
- */
- public String getName() {
- return this.groupVertex.getName();
+ public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
+ this(jobVertex, subTaskIndex, producedDataSets, System.currentTimeMillis());
}
-
- /**
- * Returns a duplicate of this execution vertex.
- *
- * @param preserveVertexID
- * <code>true</code> to copy the vertex's ID to the duplicated vertex, <code>false</code> to create a new ID
- * @return a duplicate of this execution vertex
- */
- public ExecutionVertex duplicateVertex(final boolean preserveVertexID) {
-
- ExecutionVertexID newVertexID;
- if (preserveVertexID) {
- newVertexID = this.vertexID;
- } else {
- newVertexID = new ExecutionVertexID();
- }
-
- final ExecutionVertex duplicatedVertex = new ExecutionVertex(newVertexID, this.executionGraph,
- this.groupVertex, this.outputGates.length, this.inputGates.length);
-
- // Duplicate gates
- for (int i = 0; i < this.outputGates.length; ++i) {
- duplicatedVertex.outputGates[i] = new ExecutionGate(new GateID(), duplicatedVertex,
- this.outputGates[i].getGroupEdge(), false);
- }
-
- for (int i = 0; i < this.inputGates.length; ++i) {
- duplicatedVertex.inputGates[i] = new ExecutionGate(new GateID(), duplicatedVertex,
- this.inputGates[i].getGroupEdge(), true);
+
+ public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, long createTimestamp) {
+ this.jobVertex = jobVertex;
+ this.subTaskIndex = subTaskIndex;
+
+ this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
+ for (int i = 0; i < producedDataSets.length; i++) {
+ IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
+ this.resultPartitions[i] = irp;
+ producedDataSets[i].setPartition(subTaskIndex, irp);
}
-
- // TODO set new profiling record with new vertex id
- duplicatedVertex.setAllocatedResource(this.allocatedResource.get());
-
- return duplicatedVertex;
+
+ this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
+ this.priorExecutions = new CopyOnWriteArrayList<Execution>();
+
+ this.currentExecution = new Execution(this, 0, createTimestamp);
}
-
- /**
- * Inserts the output gate at the given position.
- *
- * @param pos
- * the position to insert the output gate
- * @param outputGate
- * the output gate to be inserted
- */
- void insertOutputGate(final int pos, final ExecutionGate outputGate) {
-
- if (this.outputGates[pos] != null) {
- throw new IllegalStateException("Output gate at position " + pos + " is not null");
- }
-
- this.outputGates[pos] = outputGate;
+
+
+ // --------------------------------------------------------------------------------------------
+ // Properties
+ // --------------------------------------------------------------------------------------------
+
+ public JobID getJobId() {
+ return this.jobVertex.getJobId();
}
-
- /**
- * Inserts the input gate at the given position.
- *
- * @param pos
- * the position to insert the input gate
- * @param outputGate
- * the input gate to be inserted
- */
- void insertInputGate(final int pos, final ExecutionGate inputGate) {
-
- if (this.inputGates[pos] != null) {
- throw new IllegalStateException("Input gate at position " + pos + " is not null");
- }
-
- this.inputGates[pos] = inputGate;
+
+ public ExecutionJobVertex getJobVertex() {
+ return jobVertex;
}
-
- /**
- * Returns a duplicate of this execution vertex. The duplicated vertex receives
- * a new vertex ID.
- *
- * @return a duplicate of this execution vertex.
- */
- public ExecutionVertex splitVertex() {
-
- return duplicateVertex(false);
+
+ public JobVertexID getJobvertexId() {
+ return this.jobVertex.getJobVertexId();
}
-
- /**
- * Returns this execution vertex's current execution status.
- *
- * @return this execution vertex's current execution status
- */
- public ExecutionState getExecutionState() {
- return this.executionState.get();
+
+ public String getTaskName() {
+ return this.jobVertex.getJobVertex().getName();
}
-
- /**
- * Updates the vertex's current execution state through the job's executor service.
- *
- * @param newExecutionState
- * the new execution state
- * @param optionalMessage
- * an optional message related to the state change
- */
- public void updateExecutionStateAsynchronously(final ExecutionState newExecutionState,
- final String optionalMessage) {
-
- final Runnable command = new Runnable() {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
-
- updateExecutionState(newExecutionState, optionalMessage);
- }
- };
-
- this.executionGraph.executeCommand(command);
+
+ public int getTotalNumberOfParallelSubtasks() {
+ return this.jobVertex.getParallelism();
}
-
- /**
- * Updates the vertex's current execution state through the job's executor service.
- *
- * @param newExecutionState
- * the new execution state
- */
- public void updateExecutionStateAsynchronously(final ExecutionState newExecutionState) {
-
- updateExecutionStateAsynchronously(newExecutionState, null);
+
+ public int getParallelSubtaskIndex() {
+ return this.subTaskIndex;
}
-
- /**
- * Updates the vertex's current execution state.
- *
- * @param newExecutionState
- * the new execution state
- */
- public ExecutionState updateExecutionState(final ExecutionState newExecutionState) {
- return updateExecutionState(newExecutionState, null);
+
+ public int getNumberOfInputs() {
+ return this.inputEdges.length;
}
-
- /**
- * Updates the vertex's current execution state.
- *
- * @param newExecutionState
- * the new execution state
- * @param optionalMessage
- * an optional message related to the state change
- */
- public ExecutionState updateExecutionState(ExecutionState newExecutionState, final String optionalMessage) {
-
- if (newExecutionState == null) {
- throw new IllegalArgumentException("Argument newExecutionState must not be null");
- }
-
- final ExecutionState currentExecutionState = this.executionState.get();
- if (currentExecutionState == ExecutionState.CANCELING) {
-
- // If we are in CANCELING, ignore state changes to FINISHING
- if (newExecutionState == ExecutionState.FINISHING) {
- return currentExecutionState;
- }
-
- // Rewrite FINISHED to CANCELED if the task has been marked to be canceled
- if (newExecutionState == ExecutionState.FINISHED) {
- LOG.info("Received transition from CANCELING to FINISHED for vertex " + toString()
- + ", converting it to CANCELED");
- newExecutionState = ExecutionState.CANCELED;
- }
- }
-
- // Check and save the new execution state
- final ExecutionState previousState = this.executionState.getAndSet(newExecutionState);
- if (previousState == newExecutionState) {
- return previousState;
- }
-
- // Check the transition
- ExecutionStateTransition.checkTransition(true, toString(), previousState, newExecutionState);
-
- // Notify the listener objects
- final Iterator<ExecutionListener> it = this.executionListeners.values().iterator();
- while (it.hasNext()) {
- it.next().executionStateChanged(this.executionGraph.getJobID(), this.vertexID, newExecutionState,
- optionalMessage);
+
+ public ExecutionEdge[] getInputEdges(int input) {
+ if (input < 0 || input >= this.inputEdges.length) {
+ throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
}
-
- // The vertex was requested to be canceled by another thread
- checkCancelRequestedFlag();
-
- return previousState;
+ return inputEdges[input];
}
-
- public boolean compareAndUpdateExecutionState(final ExecutionState expected, final ExecutionState update) {
-
- if (update == null) {
- throw new IllegalArgumentException("Argument update must not be null");
- }
-
- if (!this.executionState.compareAndSet(expected, update)) {
- return false;
- }
-
- // Check the transition
- ExecutionStateTransition.checkTransition(true, toString(), expected, update);
-
- // Notify the listener objects
- final Iterator<ExecutionListener> it = this.executionListeners.values().iterator();
- while (it.hasNext()) {
- it.next().executionStateChanged(this.executionGraph.getJobID(), this.vertexID, update,
- null);
- }
-
- // Check if the vertex was requested to be canceled by another thread
- checkCancelRequestedFlag();
-
- return true;
+
+ public Execution getCurrentExecutionAttempt() {
+ return currentExecution;
}
-
- /**
- * Checks if another thread requested the vertex to cancel while it was in state STARTING. If so, the method clears
- * the respective flag and repeats the cancel request.
- */
- private void checkCancelRequestedFlag() {
-
- if (this.cancelRequested.compareAndSet(true, false)) {
- final TaskCancelResult tsr = cancelTask();
- if (tsr.getReturnCode() != AbstractTaskResult.TaskOperationResult.SUCCESS
- && tsr.getReturnCode() != AbstractTaskResult.TaskOperationResult.TASK_NOT_FOUND) {
- LOG.error("Unable to cancel vertex " + this + ": " + tsr.getReturnCode().toString()
- + ((tsr.getDescription() != null) ? (" (" + tsr.getDescription() + ")") : ""));
- }
- }
+
+ public ExecutionState getExecutionState() {
+ return currentExecution.getState();
}
-
- /**
- * Assigns the execution vertex with an {@link AllocatedResource}.
- *
- * @param allocatedResource
- * the resources which are supposed to be allocated to this vertex
- */
- public void setAllocatedResource(final AllocatedResource allocatedResource) {
-
- if (allocatedResource == null) {
- throw new IllegalArgumentException("Argument allocatedResource must not be null");
- }
-
- final AllocatedResource previousResource = this.allocatedResource.getAndSet(allocatedResource);
- if (previousResource != null) {
- previousResource.removeVertexFromResource(this);
- }
-
- allocatedResource.assignVertexToResource(this);
-
- // Notify all listener objects
- final Iterator<VertexAssignmentListener> it = this.vertexAssignmentListeners.iterator();
- while (it.hasNext()) {
- it.next().vertexAssignmentChanged(this.vertexID, allocatedResource);
- }
+
+ public long getStateTimestamp(ExecutionState state) {
+ return currentExecution.getStateTimestamp(state);
}
-
- /**
- * Returns the allocated resources assigned to this execution vertex.
- *
- * @return the allocated resources assigned to this execution vertex
- */
- public AllocatedResource getAllocatedResource() {
-
- return this.allocatedResource.get();
+
+ public Throwable getFailureCause() {
+ return currentExecution.getFailureCause();
}
-
- /**
- * Returns the allocation ID which identifies the resources used
- * by this vertex within the assigned instance.
- *
- * @return the allocation ID which identifies the resources used
- * by this vertex within the assigned instance or <code>null</code> if the instance is still assigned to a
- * {@link org.apache.flink.runtime.instance.DummyInstance}.
- */
- public AllocationID getAllocationID() {
- return this.allocationID;
+
+ public AllocatedSlot getCurrentAssignedResource() {
+ return currentExecution.getAssignedResource();
}
-
- /**
- * Returns the ID of this execution vertex.
- *
- * @return the ID of this execution vertex
- */
- public ExecutionVertexID getID() {
- return this.vertexID;
+
+ public ExecutionGraph getExecutionGraph() {
+ return this.jobVertex.getGraph();
}
-
- /**
- * Returns the number of predecessors, i.e. the number of vertices
- * which connect to this vertex.
- *
- * @return the number of predecessors
- */
- public int getNumberOfPredecessors() {
-
- int numberOfPredecessors = 0;
-
- for (int i = 0; i < this.inputGates.length; ++i) {
- numberOfPredecessors += this.inputGates[i].getNumberOfEdges();
+
+ // --------------------------------------------------------------------------------------------
+ // Graph building
+ // --------------------------------------------------------------------------------------------
+
+ public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
+
+ final DistributionPattern pattern = edge.getDistributionPattern();
+ final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
+
+ ExecutionEdge[] edges = null;
+
+ switch (pattern) {
+ case POINTWISE:
+ edges = connectPointwise(sourcePartitions, inputNumber);
+ break;
+
+ case BIPARTITE:
+ edges = connectAllToAll(sourcePartitions, inputNumber);
+ break;
+
+ default:
+ throw new RuntimeException("Unrecognized distribution pattern.");
+
}
-
- return numberOfPredecessors;
- }
-
- /**
- * Returns the number of successors, i.e. the number of vertices
- * this vertex is connected to.
- *
- * @return the number of successors
- */
- public int getNumberOfSuccessors() {
-
- int numberOfSuccessors = 0;
-
- for (int i = 0; i < this.outputGates.length; ++i) {
- numberOfSuccessors += this.outputGates[i].getNumberOfEdges();
+
+ this.inputEdges[inputNumber] = edges;
+
+ ExecutionGraph graph = getExecutionGraph();
+
+ // add the consumers to the source
+ // for now (until the receiver initiated handshake is in place), we need to register the
+ // edges as the execution graph
+ for (ExecutionEdge ee : edges) {
+ ee.getSource().addConsumer(ee, consumerNumber);
+ graph.registerExecutionEdge(ee);
}
-
- return numberOfSuccessors;
}
-
- public ExecutionVertex getPredecessor(int index) {
-
- if (index < 0) {
- throw new IllegalArgumentException("Argument index must be greather or equal to 0");
- }
-
- for (int i = 0; i < this.inputGates.length; ++i) {
-
- final ExecutionGate inputGate = this.inputGates[i];
- final int numberOfEdges = inputGate.getNumberOfEdges();
-
- if (index >= 0 && index < numberOfEdges) {
-
- final ExecutionEdge edge = inputGate.getEdge(index);
- return edge.getOutputGate().getVertex();
- }
- index -= numberOfEdges;
+
+ private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+ ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
+
+ for (int i = 0; i < sourcePartitions.length; i++) {
+ IntermediateResultPartition irp = sourcePartitions[i];
+ edges[i] = new ExecutionEdge(irp, this, inputNumber);
}
-
- return null;
+
+ return edges;
}
-
- public ExecutionVertex getSuccessor(int index) {
-
- if (index < 0) {
- throw new IllegalArgumentException("Argument index must be greather or equal to 0");
- }
-
- for (int i = 0; i < this.outputGates.length; ++i) {
-
- final ExecutionGate outputGate = this.outputGates[i];
- final int numberOfEdges = outputGate.getNumberOfEdges();
-
- if (index >= 0 && index < numberOfEdges) {
-
- final ExecutionEdge edge = outputGate.getEdge(index);
- return edge.getInputGate().getVertex();
+
+ private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
+ final int numSources = sourcePartitions.length;
+ final int parallelism = getTotalNumberOfParallelSubtasks();
+
+ // simple case same number of sources as targets
+ if (numSources == parallelism) {
+ return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
+ }
+ else if (numSources < parallelism) {
+
+ int sourcePartition;
+
+ // check if the pattern is regular or irregular
+ // we use int arithmetics for regular, and floating point with rounding for irregular
+ if (parallelism % numSources == 0) {
+ // same number of targets per source
+ int factor = parallelism / numSources;
+ sourcePartition = subTaskIndex / factor;
+ }
+ else {
+ // different number of targets per source
+ float factor = ((float) parallelism) / numSources;
+ sourcePartition = (int) (subTaskIndex / factor);
+ }
+
+ return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };
+ }
+ else {
+ if (numSources % parallelism == 0) {
+ // same number of targets per source
+ int factor = numSources / parallelism;
+ int startIndex = subTaskIndex * factor;
+
+ ExecutionEdge[] edges = new ExecutionEdge[factor];
+ for (int i = 0; i < factor; i++) {
+ edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber);
+ }
+ return edges;
+ }
+ else {
+ float factor = ((float) numSources) / parallelism;
+
+ int start = (int) (subTaskIndex * factor);
+ int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
+ sourcePartitions.length :
+ (int) ((subTaskIndex + 1) * factor);
+
+ ExecutionEdge[] edges = new ExecutionEdge[end - start];
+ for (int i = 0; i < edges.length; i++) {
+ edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber);
+ }
+
+ return edges;
}
- index -= numberOfEdges;
}
-
- return null;
-
- }
-
- /**
- * Checks if this vertex is an input vertex in its stage, i.e. has either no
- * incoming connections or only incoming connections to group vertices in a lower stage.
- *
- * @return <code>true</code> if this vertex is an input vertex, <code>false</code> otherwise
- */
- public boolean isInputVertex() {
-
- return this.groupVertex.isInputVertex();
- }
-
- /**
- * Checks if this vertex is an output vertex in its stage, i.e. has either no
- * outgoing connections or only outgoing connections to group vertices in a higher stage.
- *
- * @return <code>true</code> if this vertex is an output vertex, <code>false</code> otherwise
- */
- public boolean isOutputVertex() {
-
- return this.groupVertex.isOutputVertex();
- }
-
- /**
- * Returns the index of this vertex in the vertex group.
- *
- * @return the index of this vertex in the vertex group
- */
- public int getIndexInVertexGroup() {
-
- return this.indexInVertexGroup;
- }
-
- /**
- * Sets the vertex' index in the vertex group.
- *
- * @param indexInVertexGroup
- * the vertex' index in the vertex group
- */
- void setIndexInVertexGroup(final int indexInVertexGroup) {
-
- this.indexInVertexGroup = indexInVertexGroup;
- }
-
- /**
- * Returns the number of output gates attached to this vertex.
- *
- * @return the number of output gates attached to this vertex
- */
- public int getNumberOfOutputGates() {
-
- return this.outputGates.length;
}
/**
- * Returns the output gate with the given index.
+ * Gets the location preferences of this task, determined by the locations of the predecessors from which
+ * it receives input data.
+ * If there are more than MAX_DISTINCT_LOCATIONS_TO_CONSIDER different locations of source data, this
+ * method returns {@code null} to indicate no location preference.
*
- * @param index
- * the index of the output gate to return
- * @return the output gate with the given index
- */
- public ExecutionGate getOutputGate(final int index) {
-
- return this.outputGates[index];
- }
-
- /**
- * Returns the number of input gates attached to this vertex.
- *
- * @return the number of input gates attached to this vertex
- */
- public int getNumberOfInputGates() {
-
- return this.inputGates.length;
- }
-
- /**
- * Returns the input gate with the given index.
- *
- * @param index
- * the index of the input gate to return
- * @return the input gate with the given index
- */
- public ExecutionGate getInputGate(final int index) {
-
- return this.inputGates[index];
- }
-
- /**
- * Deploys and starts the task represented by this vertex
- * on the assigned instance.
- *
- * @return the result of the task submission attempt
- */
- public TaskSubmissionResult startTask() {
-
- final AllocatedResource ar = this.allocatedResource.get();
-
- if (ar == null) {
- final TaskSubmissionResult result = new TaskSubmissionResult(getID(),
- AbstractTaskResult.TaskOperationResult.NO_INSTANCE);
- result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
- return result;
- }
-
- final List<TaskDeploymentDescriptor> tasks = new SerializableArrayList<TaskDeploymentDescriptor>();
- tasks.add(constructDeploymentDescriptor());
-
- try {
- final List<TaskSubmissionResult> results = ar.getInstance().submitTasks(tasks);
-
- return results.get(0);
-
- } catch (IOException e) {
- final TaskSubmissionResult result = new TaskSubmissionResult(getID(),
- AbstractTaskResult.TaskOperationResult.IPC_ERROR);
- result.setDescription(StringUtils.stringifyException(e));
- return result;
- }
- }
-
- /**
- * Kills and removes the task represented by this vertex from the instance it is currently running on. If the
- * corresponding task is not in the state <code>RUNNING</code>, this call will be ignored. If the call has been
- * executed
- * successfully, the task will change the state <code>FAILED</code>.
- *
- * @return the result of the task kill attempt
+ * @return The preferred locations for this vertex execution, or null, if there is no preference.
*/
- public TaskKillResult killTask() {
-
- final ExecutionState state = this.executionState.get();
-
- if (state != ExecutionState.RUNNING) {
- final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.TaskOperationResult.ILLEGAL_STATE);
- result.setDescription("Vertex " + this.toString() + " is in state " + state);
- return result;
- }
-
- final AllocatedResource ar = this.allocatedResource.get();
-
- if (ar == null) {
- final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.TaskOperationResult.NO_INSTANCE);
- result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
- return result;
- }
-
- try {
- return ar.getInstance().killTask(this.vertexID);
- } catch (IOException e) {
- final TaskKillResult result = new TaskKillResult(getID(), AbstractTaskResult.TaskOperationResult.IPC_ERROR);
- result.setDescription(StringUtils.stringifyException(e));
- return result;
+ public Iterable<Instance> getPreferredLocations() {
+ HashSet<Instance> locations = new HashSet<Instance>();
+
+ for (int i = 0; i < inputEdges.length; i++) {
+ ExecutionEdge[] sources = inputEdges[i];
+ if (sources != null) {
+ for (int k = 0; k < sources.length; k++) {
+ Instance source = sources[k].getSource().getProducer().getCurrentAssignedResource().getInstance();
+ locations.add(source);
+ if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+ return null;
+ }
+ }
+ }
}
+ return locations;
}
-
- /**
- * Cancels and removes the task represented by this vertex
- * from the instance it is currently running on. If the task
- * is not currently running, its execution state is simply
- * updated to <code>CANCELLED</code>.
- *
- * @return the result of the task cancel attempt
- */
- public TaskCancelResult cancelTask() {
-
- while (true) {
-
- final ExecutionState previousState = this.executionState.get();
-
- if (previousState == ExecutionState.CANCELED) {
- return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
- }
-
- if (previousState == ExecutionState.FAILED) {
- return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
- }
-
- if (previousState == ExecutionState.FINISHED) {
- return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
+
+ // --------------------------------------------------------------------------------------------
+ // Actions
+ // --------------------------------------------------------------------------------------------
+
+ public void resetForNewExecution() {
+ synchronized (priorExecutions) {
+ Execution execution = currentExecution;
+ ExecutionState state = execution.getState();
+
+ if (state == FINISHED || state == CANCELED || state == FAILED) {
+ priorExecutions.add(execution);
+ currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis());
}
-
- // The vertex has already received a cancel request
- if (previousState == ExecutionState.CANCELING) {
- return new TaskCancelResult(getID(), ReturnCode.SUCCESS);
- }
-
- // Do not trigger the cancel request when vertex is in state STARTING, this might cause a race between RPC
- // calls.
- if (previousState == ExecutionState.STARTING) {
-
- this.cancelRequested.set(true);
-
- // We had a race, so we unset the flag and take care of cancellation ourselves
- if (this.executionState.get() != ExecutionState.STARTING) {
- this.cancelRequested.set(false);
- continue;
- }
-
- return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
- }
-
- // Check if we had a race. If state change is accepted, send cancel request
- if (compareAndUpdateExecutionState(previousState, ExecutionState.CANCELING)) {
-
- if (this.groupVertex.getStageNumber() != this.executionGraph.getIndexOfCurrentExecutionStage()) {
- // Set to canceled directly
- updateExecutionState(ExecutionState.CANCELED, null);
- return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
- }
-
- if (previousState != ExecutionState.RUNNING && previousState != ExecutionState.FINISHING) {
- // Set to canceled directly
- updateExecutionState(ExecutionState.CANCELED, null);
- return new TaskCancelResult(getID(), AbstractTaskResult.TaskOperationResult.SUCCESS);
- }
-
- final AllocatedResource ar = this.allocatedResource.get();
-
- if (ar == null) {
- final TaskCancelResult result = new TaskCancelResult(getID(),
- AbstractTaskResult.TaskOperationResult.NO_INSTANCE);
- result.setDescription("Assigned instance of vertex " + this.toString() + " is null!");
- return result;
- }
-
- try {
- return ar.getInstance().cancelTask(this.vertexID);
-
- } catch (IOException e) {
- final TaskCancelResult result = new TaskCancelResult(getID(),
- AbstractTaskResult.TaskOperationResult.IPC_ERROR);
- result.setDescription(StringUtils.stringifyException(e));
- return result;
- }
+ else {
+ throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
}
}
}
-
- /**
- * Returns the {@link ExecutionGraph} this vertex belongs to.
- *
- * @return the {@link ExecutionGraph} this vertex belongs to
- */
- public ExecutionGraph getExecutionGraph() {
-
- return this.executionGraph;
+
+ public void scheduleForExecution(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+ this.currentExecution.scheduleForExecution(scheduler, queued);
}
-
-
- @Override
- public String toString() {
-
- final StringBuilder sb = new StringBuilder(this.groupVertex.getName());
- sb.append(" (");
- sb.append(this.indexInVertexGroup + 1);
- sb.append('/');
- sb.append(this.groupVertex.getCurrentNumberOfGroupMembers());
- sb.append(')');
-
- return sb.toString();
+
+ public void deployToSlot(AllocatedSlot slot) throws JobException {
+ this.currentExecution.deployToSlot(slot);
}
-
- /**
- * Returns the task represented by this vertex has
- * a retry attempt left in case of an execution
- * failure.
- *
- * @return <code>true</code> if the task has a retry attempt left, <code>false</code> otherwise
- */
- @Deprecated
- public boolean hasRetriesLeft() {
- if (this.retriesLeft.get() <= 0) {
- return false;
- }
- return true;
+
+ public void cancel() {
+ this.currentExecution.cancel();
}
-
- /**
- * Decrements the number of retries left and checks whether another attempt to run the task is possible.
- *
- * @return <code>true</code>if the task represented by this vertex can be started at least once more,
- * <code>false/<code> otherwise
- */
- public boolean decrementRetriesLeftAndCheck() {
- return (this.retriesLeft.decrementAndGet() > 0);
+
+ public void fail(Throwable t) {
+ this.currentExecution.fail(t);
}
-
- /**
- * Registers the {@link VertexAssignmentListener} object for this vertex. This object
- * will be notified about reassignments of this vertex to another instance.
- *
- * @param vertexAssignmentListener
- * the object to be notified about reassignments of this vertex to another instance
- */
- public void registerVertexAssignmentListener(final VertexAssignmentListener vertexAssignmentListener) {
-
- this.vertexAssignmentListeners.addIfAbsent(vertexAssignmentListener);
+
+ // --------------------------------------------------------------------------------------------
+ // Notifications from the Execution Attempt
+ // --------------------------------------------------------------------------------------------
+
+ void executionFinished() {
+ jobVertex.vertexFinished(subTaskIndex);
}
-
- /**
- * Unregisters the {@link VertexAssignmentListener} object for this vertex. This object
- * will no longer be notified about reassignments of this vertex to another instance.
- *
- * @param vertexAssignmentListener
- * the listener to be unregistered
- */
- public void unregisterVertexAssignmentListener(final VertexAssignmentListener vertexAssignmentListener) {
-
- this.vertexAssignmentListeners.remove(vertexAssignmentListener);
+
+ void executionCanceled() {
+ jobVertex.vertexCancelled(subTaskIndex);
}
-
-
- /**
- * Registers the {@link ExecutionListener} object for this vertex. This object
- * will be notified about particular events during the vertex's lifetime.
- *
- * @param executionListener
- * the object to be notified about particular events during the vertex's lifetime
- */
- public void registerExecutionListener(final ExecutionListener executionListener) {
-
- final Integer priority = Integer.valueOf(executionListener.getPriority());
-
- if (priority.intValue() < 0) {
- LOG.error("Priority for execution listener " + executionListener.getClass() + " must be non-negative.");
- return;
- }
-
- final ExecutionListener previousValue = this.executionListeners.putIfAbsent(priority, executionListener);
-
- if (previousValue != null) {
- LOG.error("Cannot register " + executionListener.getClass() + " as an execution listener. Priority "
- + priority.intValue() + " is already taken.");
- }
+
+ void executionFailed(Throwable t) {
+ jobVertex.vertexFailed(subTaskIndex, t);
}
-
+
+ // --------------------------------------------------------------------------------------------
+ // Miscellaneous
+ // --------------------------------------------------------------------------------------------
+
/**
- * Unregisters the {@link ExecutionListener} object for this vertex. This object
- * will no longer be notified about particular events during the vertex's lifetime.
+ * Simply forward this notification. This is for logs and event archivers.
*
- * @param executionListener
- * the object to be unregistered
+ * @param executionId
+ * @param newState
+ * @param error
*/
- public void unregisterExecutionListener(final ExecutionListener executionListener) {
-
- this.executionListeners.remove(Integer.valueOf(executionListener.getPriority()));
+ void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
+ getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
}
-
-
- /**
- * Sets the {@link ExecutionPipeline} this vertex shall be part of.
- *
- * @param executionPipeline
- * the execution pipeline this vertex shall be part of
- */
- void setExecutionPipeline(final ExecutionPipeline executionPipeline) {
-
- final ExecutionPipeline oldPipeline = this.executionPipeline.getAndSet(executionPipeline);
- if (oldPipeline != null) {
- oldPipeline.removeFromPipeline(this);
+
+ TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, AllocatedSlot slot) {
+ // create the input gate deployment descriptors
+ List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
+ for (ExecutionEdge[] channels : inputEdges) {
+ inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
}
-
- executionPipeline.addToPipeline(this);
+
+ // create the output gate deployment descriptors
+ List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(resultPartitions.length);
+ for (IntermediateResultPartition partition : resultPartitions) {
+ for (List<ExecutionEdge> channels : partition.getConsumers()) {
+ outputGates.add(GateDeploymentDescriptor.fromEdges(channels));
+ }
+ }
+
+ String[] jarFiles = getExecutionGraph().getUserCodeJarFiles();
+
+ return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(),
+ subTaskIndex, getTotalNumberOfParallelSubtasks(),
+ getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
+ jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles, slot.getSlotNumber());
}
-
- /**
- * Returns the {@link ExecutionPipeline} this vertex is part of.
- *
- * @return the execution pipeline this vertex is part of
- */
- public ExecutionPipeline getExecutionPipeline() {
-
- return this.executionPipeline.get();
+
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+ public void execute(Runnable action) {
+ this.jobVertex.execute(action);
}
-
+
/**
- * Constructs a new task deployment descriptor for this vertex.
+ * Creates a simple name representation in the style 'taskname (x/y)', where
+ * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
+ * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
+ * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
*
- * @return a new task deployment descriptor for this vertex
+ * @return A simple name representation.
*/
- public TaskDeploymentDescriptor constructDeploymentDescriptor() {
-
- final SerializableArrayList<GateDeploymentDescriptor> ogd = new SerializableArrayList<GateDeploymentDescriptor>(
- this.outputGates.length);
- for (int i = 0; i < this.outputGates.length; ++i) {
-
- final ExecutionGate eg = this.outputGates[i];
- final List<ChannelDeploymentDescriptor> cdd = new ArrayList<ChannelDeploymentDescriptor>(
- eg.getNumberOfEdges());
- final int numberOfOutputChannels = eg.getNumberOfEdges();
- for (int j = 0; j < numberOfOutputChannels; ++j) {
-
- final ExecutionEdge ee = eg.getEdge(j);
- cdd.add(new ChannelDeploymentDescriptor(ee.getOutputChannelID(), ee.getInputChannelID()));
- }
-
- ogd.add(new GateDeploymentDescriptor(eg.getGateID(), eg.getChannelType(), cdd));
- }
-
- final SerializableArrayList<GateDeploymentDescriptor> igd = new SerializableArrayList<GateDeploymentDescriptor>(
- this.inputGates.length);
- for (int i = 0; i < this.inputGates.length; ++i) {
-
- final ExecutionGate eg = this.inputGates[i];
- final List<ChannelDeploymentDescriptor> cdd = new ArrayList<ChannelDeploymentDescriptor>(
- eg.getNumberOfEdges());
- final int numberOfInputChannels = eg.getNumberOfEdges();
- for (int j = 0; j < numberOfInputChannels; ++j) {
-
- final ExecutionEdge ee = eg.getEdge(j);
- cdd.add(new ChannelDeploymentDescriptor(ee.getOutputChannelID(), ee.getInputChannelID()));
- }
-
- igd.add(new GateDeploymentDescriptor(eg.getGateID(), eg.getChannelType(), cdd));
- }
-
- final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(this.executionGraph.getJobID(),
- this.vertexID, this.groupVertex.getName(), this.indexInVertexGroup,
- this.groupVertex.getCurrentNumberOfGroupMembers(), this.executionGraph.getJobConfiguration(),
- this.groupVertex.getConfiguration(), this.groupVertex.getInvokableClass(), ogd,
- igd);
-
- return tdd;
+ public String getSimpleName() {
+ return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
}
- public void handleException(Throwable t) {
-
+ @Override
+ public String toString() {
+ return getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
deleted file mode 100644
index 787605d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex2.java
+++ /dev/null
@@ -1,710 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import org.apache.commons.logging.Log;
-import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState2;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
-import org.apache.flink.runtime.taskmanager.TaskOperationResult;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-
-import static org.apache.flink.runtime.execution.ExecutionState2.*;
-
-/**
- *
- * NOTE ABOUT THE DESIGN RATIONAL:
- *
- * In several points of the code, we need to deal with possible concurrent state changes and actions.
- * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
- *
- * We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
- * it is guaranteed that any "cancel command" will only pick up after deployment is done and that the "cancel
- * command" call will never overtake the deploying call.
- *
- * This blocks the threads big time, because the remote calls may take long. Depending of their locking behavior, it
- * may even result in distributed deadlocks (unless carefully avoided). We therefore use atomic state updates and
- * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
- * actions if it is not. Many actions are also idempotent (like canceling).
- */
-public class ExecutionVertex2 {
-
- private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionState2> STATE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionState2.class, "state");
-
- private static final AtomicReferenceFieldUpdater<ExecutionVertex2, ExecutionAttempt> ATTEMPT_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(ExecutionVertex2.class, ExecutionAttempt.class, "currentOrLastAttempt");
-
- private static final Logger LOG = ExecutionGraph.LOG;
-
- private static final int NUM_CANCEL_CALL_TRIES = 3;
-
- // --------------------------------------------------------------------------------------------
-
- private final ExecutionJobVertex jobVertex;
-
- private final IntermediateResultPartition[] resultPartitions;
-
- private final ExecutionEdge2[][] inputEdges;
-
- private final int subTaskIndex;
-
- private final long[] stateTimestamps;
-
- private final List<ExecutionAttempt> priorAttempts;
-
- private volatile ExecutionAttempt currentOrLastAttempt;
-
- private volatile ExecutionState2 state = CREATED;
-
-
- public ExecutionVertex2(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
- this.jobVertex = jobVertex;
- this.subTaskIndex = subTaskIndex;
-
- this.resultPartitions = new IntermediateResultPartition[producedDataSets.length];
- for (int i = 0; i < producedDataSets.length; i++) {
- IntermediateResultPartition irp = new IntermediateResultPartition(producedDataSets[i], this, subTaskIndex);
- this.resultPartitions[i] = irp;
- producedDataSets[i].setPartition(subTaskIndex, irp);
- }
-
- this.inputEdges = new ExecutionEdge2[jobVertex.getJobVertex().getInputs().size()][];
-
- this.stateTimestamps = new long[ExecutionState2.values().length];
- this.priorAttempts = new CopyOnWriteArrayList<ExecutionAttempt>();
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Properties
- // --------------------------------------------------------------------------------------------
-
- public JobID getJobId() {
- return this.jobVertex.getJobId();
- }
-
- public JobVertexID getJobvertexId() {
- return this.jobVertex.getJobVertexId();
- }
-
- public String getTaskName() {
- return this.jobVertex.getJobVertex().getName();
- }
-
- public int getTotalNumberOfParallelSubtasks() {
- return this.jobVertex.getParallelism();
- }
-
- public int getParallelSubtaskIndex() {
- return this.subTaskIndex;
- }
-
- public int getNumberOfInputs() {
- return this.inputEdges.length;
- }
-
- public ExecutionEdge2[] getInputEdges(int input) {
- if (input < 0 || input >= this.inputEdges.length) {
- throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
- }
- return inputEdges[input];
- }
-
- public ExecutionState2 getExecutionState() {
- return state;
- }
-
- public long getStateTimestamp(ExecutionState2 state) {
- return this.stateTimestamps[state.ordinal()];
- }
-
- private ExecutionGraph getExecutionGraph() {
- return this.jobVertex.getGraph();
- }
-
- public Throwable getLastFailureCause() {
- // copy reference to the stack
- ExecutionAttempt attempt = this.currentOrLastAttempt;
- if (attempt != null) {
- return attempt.getFailureCause();
- }
- else if (priorAttempts.size() > 0) {
- // since the list is append-only, this always works in the presence of concurrent modifications
- return priorAttempts.get(priorAttempts.size() - 1).getFailureCause();
- }
- else {
- return null;
- }
- }
-
- public AllocatedSlot getCurrentAssignedResource() {
- // copy reference to the stack
- ExecutionAttempt attempt = this.currentOrLastAttempt;
- return attempt == null ? null : attempt.getAssignedResource();
- }
-
- public AllocatedSlot getLastAssignedResource() {
- // copy reference to the stack
- ExecutionAttempt attempt = this.currentOrLastAttempt;
- if (attempt != null) {
- return attempt.getAssignedResource();
- }
- else if (priorAttempts.size() > 0) {
- // since the list is append-only, this always works in the presence of concurrent modifications
- return priorAttempts.get(priorAttempts.size() - 1).getAssignedResource();
- }
- else {
- return null;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Graph building
- // --------------------------------------------------------------------------------------------
-
- public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
-
- final DistributionPattern pattern = edge.getDistributionPattern();
- final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
-
- ExecutionEdge2[] edges = null;
-
- switch (pattern) {
- case POINTWISE:
- edges = connectPointwise(sourcePartitions, inputNumber);
- break;
-
- case BIPARTITE:
- edges = connectAllToAll(sourcePartitions, inputNumber);
- break;
-
- default:
- throw new RuntimeException("Unrecognized distribution pattern.");
-
- }
-
- this.inputEdges[inputNumber] = edges;
-
- // add the cousumers to the source
- for (ExecutionEdge2 ee : edges) {
- ee.getSource().addConsumer(ee, consumerNumber);
- }
- }
-
- private ExecutionEdge2[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
- ExecutionEdge2[] edges = new ExecutionEdge2[sourcePartitions.length];
-
- for (int i = 0; i < sourcePartitions.length; i++) {
- IntermediateResultPartition irp = sourcePartitions[i];
- edges[i] = new ExecutionEdge2(irp, this, inputNumber);
- }
-
- return edges;
- }
-
- private ExecutionEdge2[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
- final int numSources = sourcePartitions.length;
- final int parallelism = getTotalNumberOfParallelSubtasks();
-
- // simple case same number of sources as targets
- if (numSources == parallelism) {
- return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[subTaskIndex], this, inputNumber) };
- }
- else if (numSources < parallelism) {
-
- int sourcePartition;
-
- // check if the pattern is regular or irregular
- // we use int arithmetics for regular, and floating point with rounding for irregular
- if (parallelism % numSources == 0) {
- // same number of targets per source
- int factor = parallelism / numSources;
- sourcePartition = subTaskIndex / factor;
- }
- else {
- // different number of targets per source
- float factor = ((float) parallelism) / numSources;
- sourcePartition = (int) (subTaskIndex / factor);
- }
-
- return new ExecutionEdge2[] { new ExecutionEdge2(sourcePartitions[sourcePartition], this, inputNumber) };
- }
- else {
- if (numSources % parallelism == 0) {
- // same number of targets per source
- int factor = numSources / parallelism;
- int startIndex = subTaskIndex * factor;
-
- ExecutionEdge2[] edges = new ExecutionEdge2[factor];
- for (int i = 0; i < factor; i++) {
- edges[i] = new ExecutionEdge2(sourcePartitions[startIndex + i], this, inputNumber);
- }
- return edges;
- }
- else {
- float factor = ((float) numSources) / parallelism;
-
- int start = (int) (subTaskIndex * factor);
- int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
- sourcePartitions.length :
- (int) ((subTaskIndex + 1) * factor);
-
- ExecutionEdge2[] edges = new ExecutionEdge2[end - start];
- for (int i = 0; i < edges.length; i++) {
- edges[i] = new ExecutionEdge2(sourcePartitions[start + i], this, inputNumber);
- }
-
- return edges;
- }
- }
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Scheduling
- // --------------------------------------------------------------------------------------------
-
- /**
- * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
- * to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
- * error sets the vertex state to failed and triggers the recovery logic.
- *
- * @param scheduler
- *
- * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
- * @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
- */
- public void scheduleForExecution(DefaultScheduler scheduler) throws NoResourceAvailableException {
- if (scheduler == null) {
- throw new NullPointerException();
- }
-
- if (STATE_UPDATER.compareAndSet(this, CREATED, SCHEDULED)) {
-
- getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, SCHEDULED, null);
-
- ScheduledUnit toSchedule = new ScheduledUnit(this, jobVertex.getSlotSharingGroup());
-
- // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
- // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
-
- boolean queued = jobVertex.getGraph().isQueuedSchedulingAllowed();
- if (queued) {
- SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
-
- future.setFutureAction(new SlotAllocationFutureAction() {
- @Override
- public void slotAllocated(AllocatedSlot slot) {
- try {
- deployToSlot(slot);
- }
- catch (Throwable t) {
- try {
- slot.releaseSlot();
- } finally {
- fail(t);
- }
- }
- }
- });
- }
- else {
- AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule);
- try {
- deployToSlot(slot);
- }
- catch (Throwable t) {
- try {
- slot.releaseSlot();
- } finally {
- fail(t);
- }
- }
- }
- }
- else if (this.state == CANCELED) {
- // this can occur very rarely through heavy races. if the task was canceled, we do not
- // schedule it
- return;
- }
- else {
- throw new IllegalStateException("The vertex must be in CREATED state to be scheduled.");
- }
- }
-
-
- public void deployToSlot(final AllocatedSlot slot) throws JobException {
- // sanity checks
- if (slot == null) {
- throw new NullPointerException();
- }
- if (!slot.isAlive()) {
- throw new IllegalArgumentException("Cannot deploy to a slot that is not alive.");
- }
-
- // make sure exactly one deployment call happens from the correct state
- // note: the transition from CREATED to DEPLOYING is for testing purposes only
- ExecutionState2 previous = this.state;
- if (previous == SCHEDULED || previous == CREATED) {
- if (!STATE_UPDATER.compareAndSet(this, previous, DEPLOYING)) {
- // race condition, someone else beat us to the deploying call.
- // this should actually not happen and indicates a race somewhere else
- throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
- }
-
- getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, DEPLOYING, null);
- }
- else {
- // vertex may have been cancelled, or it was already scheduled
- throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
- }
-
- // good, we are allowed to deploy
- if (!slot.setExecutedVertex(this)) {
- throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
- }
- setAssignedSlot(slot);
-
-
- final TaskDeploymentDescriptor deployment = createDeploymentDescriptor();
-
- // we execute the actual deploy call in a concurrent action to prevent this call from blocking for long
- Runnable deployaction = new Runnable() {
-
- @Override
- public void run() {
- try {
- Instance instance = slot.getInstance();
- instance.checkLibraryAvailability(getJobId());
-
- TaskOperationResult result = instance.getTaskManagerProxy().submitTask(deployment);
- if (result.isSuccess()) {
- switchToRunning();
- }
- else {
- // deployment failed :(
- fail(new Exception("Failed to deploy the tast to slot " + slot + ": " + result.getDescription()));
- }
- }
- catch (Throwable t) {
- // some error occurred. fail the task
- fail(t);
- }
- }
- };
-
- execute(deployaction);
- }
-
- private void switchToRunning() {
-
- // transition state
- if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, DEPLOYING, RUNNING)) {
-
- getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, RUNNING, null);
-
- this.jobVertex.vertexSwitchedToRunning(subTaskIndex);
- }
- else {
- // something happened while the call was in progress.
- // typically, that means canceling while deployment was in progress
-
- ExecutionState2 currentState = ExecutionVertex2.this.state;
-
- if (currentState == CANCELING) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Concurrent canceling of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
- }
-
- sendCancelRpcCall();
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Concurrent unexpected state transition of task %s while deployment was in progress.", ExecutionVertex2.this.toString()));
- }
-
- // undo the deployment
- sendCancelRpcCall();
-
- // record the failure
- fail(new Exception("Asynchronous state error. Execution Vertex switched to " + currentState + " while deployment was in progress."));
- }
- }
- }
-
- public void cancel() {
- // depending on the previous state, we go directly to cancelled (no cancel call necessary)
- // -- or to canceling (cancel call needs to be sent to the task manager)
-
- // because of several possibly previous states, we need to again loop until we make a
- // successful atomic state transition
- while (true) {
-
- ExecutionState2 current = this.state;
-
- if (current == CANCELING || current == CANCELED) {
- // already taken care of, no need to cancel again
- return;
- }
-
- // these two are the common cases where we need to send a cancel call
- else if (current == RUNNING || current == DEPLOYING) {
- // try to transition to canceling, if successful, send the cancel call
- if (STATE_UPDATER.compareAndSet(this, current, CANCELING)) {
-
- getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELING, null);
-
- sendCancelRpcCall();
- return;
- }
- // else: fall through the loop
- }
-
- else if (current == FINISHED || current == FAILED) {
- // nothing to do any more. finished failed before it could be cancelled.
- // in any case, the task is removed from the TaskManager already
- return;
- }
- else if (current == CREATED || current == SCHEDULED) {
- // from here, we can directly switch to cancelled, because the no task has been deployed
- if (STATE_UPDATER.compareAndSet(this, current, CANCELED)) {
-
- getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, CANCELED, null);
-
- return;
- }
- // else: fall through the loop
- }
- else {
- throw new IllegalStateException(current.name());
- }
- }
- }
-
- public void fail(Throwable t) {
-
- // damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex
- // the actual computation on the task manager is cleaned up by the taskmanager that noticed the failure
-
- // we may need to loop multiple times (in the presence of concurrent calls) in order to
- // atomically switch to failed
- while (true) {
- ExecutionState2 current = this.state;
-
- if (current == FAILED) {
- // concurrently set to failed. It is enough to remember once that we failed (its sad enough)
- return;
- }
-
- if (current == CANCELED) {
- // we already aborting
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s",
- getSimpleName(), FAILED, current));
- }
- return;
- }
-
- // we should be in DEPLOYING or RUNNING when a regular failure happens
- if (current != DEPLOYING && current != RUNNING && current != CANCELING) {
- // this should not happen. still, what else to do but to comply and go to the FAILED state
- // at least we should complain loudly to the log
- LOG.error(String.format("Vertex %s unexpectedly went from state %s to %s with error: %s",
- getSimpleName(), CREATED, FAILED, t.getMessage()), t);
- }
-
- if (STATE_UPDATER.compareAndSet(this, current, FAILED)) {
- // success (in a manner of speaking)
- this.failureCause = t;
-
- getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, FAILED, StringUtils.stringifyException(t));
-
- // release the slot (concurrency safe)
- setAssignedSlot(null);
-
- this.jobVertex.vertexFailed(subTaskIndex);
-
- // leave the loop
- return;
- }
- }
- }
-
- private void sendCancelRpcCall() {
- // first of all, copy a reference to the stack. any concurrent change to the
- // field does not affect us now
- final AllocatedSlot slot = this.assignedSlot;
- if (slot == null) {
- throw new IllegalStateException("Cannot cancel when task was not running or deployed.");
- }
-
- Runnable cancelAction = new Runnable() {
-
- @Override
- public void run() {
- Throwable exception = null;
-
- for (int triesLeft = NUM_CANCEL_CALL_TRIES; triesLeft > 0; --triesLeft) {
-
- try {
- // send the call. it may be that the task is not really there (asynchronous / overtaking messages)
- // in which case it is fine (the deployer catches it)
- TaskOperationResult result = slot.getInstance().getTaskManagerProxy().cancelTask(getJobvertexId(), subTaskIndex);
-
- if (result.isSuccess()) {
-
- // make sure that we release the slot
- try {
- // found and canceled
- if (STATE_UPDATER.compareAndSet(ExecutionVertex2.this, CANCELING, CANCELED)) {
- // we completed the call.
- // release the slot resource and let the parent know we have cancelled
- ExecutionVertex2.this.jobVertex.vertexCancelled(ExecutionVertex2.this.subTaskIndex);
- }
- else {
- ExecutionState2 foundState = ExecutionVertex2.this.state;
- // failing in the meantime may happen and is no problem
- if (foundState != FAILED) {
- // corner case? log at least
- LOG.error(String.format("Asynchronous race: Found state %s after successful cancel call.", foundState));
- }
-
- }
- } finally {
- slot.releaseSlot();
- }
- }
- else {
- // the task was not found, which may be when the task concurrently finishes or fails, or
- // when the cancel call overtakes the deployment call
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cancel task call did not find task. Probably cause: Acceptable asynchronous race.");
- }
- }
-
- // in any case, we need not call multiple times, so we quit
- return;
- }
- catch (Throwable t) {
- if (exception == null) {
- exception = t;
- }
- LOG.error("Canceling vertex " + getSimpleName() + " failed (" + triesLeft + " tries left): " + t.getMessage() , t);
- }
- }
-
- // dang, utterly unsuccessful - the target node must be down, in which case the tasks are lost anyways
- fail(new Exception("Task could not be canceled.", exception));
- }
- };
-
- execute(cancelAction);
- }
-
- public Iterable<Instance> getPreferredLocations() {
- return null;
- }
-
- private void setAssignedSlot(AllocatedSlot slot) {
-
- while (true) {
- AllocatedSlot previous = this.assignedSlot;
- if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, previous, slot)) {
- // successfully swapped
- // release the predecessor, if it was not null. this call is idempotent, so it does not matter if it is
- // called more than once
- try {
- if (previous != null) {
- previous.releaseSlot();
- }
- } catch (Throwable t) {
- LOG.debug("Error releasing slot " + slot, t);
- }
- return;
- }
- }
- }
-
-
- private TaskDeploymentDescriptor createDeploymentDescriptor() {
- // create the input gate deployment descriptors
- List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
- for (ExecutionEdge2[] channels : inputEdges) {
- inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
- }
-
- // create the output gate deployment descriptors
- List<GateDeploymentDescriptor> outputGates = new ArrayList<GateDeploymentDescriptor>(resultPartitions.length);
- for (IntermediateResultPartition partition : resultPartitions) {
- for (List<ExecutionEdge2> channels : partition.getConsumers()) {
- outputGates.add(GateDeploymentDescriptor.fromEdges(channels));
- }
- }
-
- String[] jarFiles = getExecutionGraph().getUserCodeJarFiles();
-
- return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), getTaskName(),
- subTaskIndex, getTotalNumberOfParallelSubtasks(),
- getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
- jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles);
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
- public void execute(Runnable action) {
- this.jobVertex.execute(action);
- }
-
- /**
- * Creates a simple name representation in the style 'taskname (x/y)', where
- * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
- * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
- * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
- *
- * @return A simple name representation.
- */
- public String getSimpleName() {
- return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
- }
-
- @Override
- public String toString() {
- return getSimpleName() + " [" + state + ']';
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index b733baa..1c4e1fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -25,21 +25,21 @@ public class IntermediateResultPartition {
private final IntermediateResult totalResut;
- private final ExecutionVertex2 producer;
+ private final ExecutionVertex producer;
private final int partition;
- private List<List<ExecutionEdge2>> consumers;
+ private List<List<ExecutionEdge>> consumers;
- public IntermediateResultPartition(IntermediateResult totalResut, ExecutionVertex2 producer, int partition) {
+ public IntermediateResultPartition(IntermediateResult totalResut, ExecutionVertex producer, int partition) {
this.totalResut = totalResut;
this.producer = producer;
this.partition = partition;
- this.consumers = new ArrayList<List<ExecutionEdge2>>(0);
+ this.consumers = new ArrayList<List<ExecutionEdge>>(0);
}
- public ExecutionVertex2 getProducer() {
+ public ExecutionVertex getProducer() {
return producer;
}
@@ -51,17 +51,23 @@ public class IntermediateResultPartition {
return totalResut;
}
- public List<List<ExecutionEdge2>> getConsumers() {
+ public List<List<ExecutionEdge>> getConsumers() {
return consumers;
}
int addConsumerGroup() {
int pos = consumers.size();
- consumers.add(new ArrayList<ExecutionEdge2>());
+
+ // NOTE: currently we support only one consumer per result!!!
+ if (pos != 0) {
+ throw new RuntimeException("Currenty, each intermediate result can only have one consumer.");
+ }
+
+ consumers.add(new ArrayList<ExecutionEdge>());
return pos;
}
- public void addConsumer(ExecutionEdge2 edge, int consumerNumber) {
+ void addConsumer(ExecutionEdge edge, int consumerNumber) {
consumers.get(consumerNumber).add(edge);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index f3f489b..2f3fa10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.instance;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobgraph.JobID;
/**
@@ -32,8 +32,8 @@ public class AllocatedSlot {
private static final AtomicIntegerFieldUpdater<AllocatedSlot> STATUS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AllocatedSlot.class, "status");
- private static final AtomicReferenceFieldUpdater<AllocatedSlot, ExecutionVertex2> VERTEX_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, ExecutionVertex2.class, "executedVertex");
+ private static final AtomicReferenceFieldUpdater<AllocatedSlot, Execution> VERTEX_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(AllocatedSlot.class, Execution.class, "executedTask");
private static final int ALLOCATED_AND_ALIVE = 0; // tasks may be added and might be running
private static final int CANCELLED = 1; // no more tasks may run
@@ -49,8 +49,8 @@ public class AllocatedSlot {
/** The number of the slot on which the task is deployed */
private final int slotNumber;
- /** Vertex being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
- private volatile ExecutionVertex2 executedVertex;
+ /** Task being executed in the slot. Volatile to force a memory barrier and allow for correct double-checking */
+ private volatile Execution executedTask;
/** The state of the vertex, only atomically updated */
private volatile int status = ALLOCATED_AND_ALIVE;
@@ -85,7 +85,11 @@ public class AllocatedSlot {
return slotNumber;
}
- public boolean setExecutedVertex(ExecutionVertex2 executedVertex) {
+ public Execution getExecutedVertex() {
+ return executedTask;
+ }
+
+ public boolean setExecutedVertex(Execution executedVertex) {
if (executedVertex == null) {
throw new NullPointerException();
}
@@ -102,17 +106,13 @@ public class AllocatedSlot {
// we need to do a double check that we were not cancelled in the meantime
if (status != ALLOCATED_AND_ALIVE) {
- this.executedVertex = null;
+ this.executedTask = null;
return false;
}
return true;
}
- public ExecutionVertex2 getExecutedVertex() {
- return executedVertex;
- }
-
// --------------------------------------------------------------------------------------------
// Status and life cycle
// --------------------------------------------------------------------------------------------
@@ -133,8 +133,9 @@ public class AllocatedSlot {
public void cancel() {
if (STATUS_UPDATER.compareAndSet(this, ALLOCATED_AND_ALIVE, CANCELLED)) {
// kill all tasks currently running in this slot
- if (this.executedVertex != null) {
- this.executedVertex.fail(new Exception("The slot in which the task was scheduled has been cancelled."));
+ Execution exec = this.executedTask;
+ if (exec != null && !exec.isFinished()) {
+ exec.fail(new Exception("The slot in which the task was scheduled has been cancelled."));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 543ae86..bc30254 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -197,19 +197,21 @@ public class Instance {
throw new IOException("No entry of required libraries for job " + jobID);
}
- LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
- request.setRequiredLibraries(requiredLibraries);
-
- // Send the request
- LibraryCacheProfileResponse response = getTaskManagerProxy().getLibraryCacheProfile(request);
-
- // Check response and transfer libraries if necessary
- for (int k = 0; k < requiredLibraries.length; k++) {
- if (!response.isCached(k)) {
- LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
- getTaskManagerProxy().updateLibraryCache(update);
+// if (requiredLibraries.length > 0) {
+ LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
+ request.setRequiredLibraries(requiredLibraries);
+
+ // Send the request
+ LibraryCacheProfileResponse response = getTaskManagerProxy().getLibraryCacheProfile(request);
+
+ // Check response and transfer libraries if necessary
+ for (int k = 0; k < requiredLibraries.length; k++) {
+ if (!response.isCached(k)) {
+ LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[k]);
+ getTaskManagerProxy().updateLibraryCache(update);
+ }
}
- }
+// }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index ec63c00..c17e631 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -23,9 +23,9 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StringUtils;
/**
* This class encapsulates all connection information necessary to connect to the instance's task manager.
@@ -154,9 +154,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
this.ipcPort = in.readInt();
this.dataPort = in.readInt();
- if (in.readBoolean()) {
- this.hostName = StringRecord.readString(in);
- }
+ this.hostName = StringUtils.readNullableString(in);
try {
this.inetAddress = InetAddress.getByAddress(address);
@@ -174,12 +172,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
out.writeInt(this.ipcPort);
out.writeInt(this.dataPort);
- if (this.hostName != null) {
- out.writeBoolean(true);
- StringRecord.writeString(out, this.hostName);
- } else {
- out.writeBoolean(false);
- }
+ StringUtils.writeNullableString(hostName, out);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 5571ccb..3066bb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -110,6 +110,10 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
this.globalBufferPool.destroy();
}
+ public GlobalBufferPool getGlobalBufferPool() {
+ return globalBufferPool;
+ }
+
// -----------------------------------------------------------------------------------------------------------------
// Task registration
// -----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
index 23d1205..cdf9b87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.network;
import java.io.IOException;
@@ -28,18 +27,17 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
- *
+ * Objects of this class uniquely identify a connection to a remote {@link org.apache.flink.runtime.taskmanager.TaskManager}.
*/
public final class RemoteReceiver implements IOReadableWritable {
/**
- * The address of the connection to the remote {@link TaskManager}.
+ * The address of the connection to the remote TaskManager.
*/
private InetSocketAddress connectionAddress;
/**
- * The index of the connection to the remote {@link TaskManager}.
+ * The index of the connection to the remote TaskManager.
*/
private int connectionIndex;
@@ -47,9 +45,9 @@ public final class RemoteReceiver implements IOReadableWritable {
* Constructs a new remote receiver object.
*
* @param connectionAddress
- * the address of the connection to the remote {@link TaskManager}
+ * the address of the connection to the remote TaskManager
* @param connectionIndex
- * the index of the connection to the remote {@link TaskManager}
+ * the index of the connection to the remote TaskManager
*/
public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
if (connectionAddress == null) {
@@ -72,18 +70,18 @@ public final class RemoteReceiver implements IOReadableWritable {
}
/**
- * Returns the address of the connection to the remote {@link TaskManager}.
+ * Returns the address of the connection to the remote TaskManager.
*
- * @return the address of the connection to the remote {@link TaskManager}
+ * @return the address of the connection to the remote TaskManager
*/
public InetSocketAddress getConnectionAddress() {
return this.connectionAddress;
}
/**
- * Returns the index of the connection to the remote {@link TaskManager}.
+ * Returns the index of the connection to the remote TaskManager.
*
- * @return the index of the connection to the remote {@link TaskManager}
+ * @return the index of the connection to the remote TaskManager
*/
public int getConnectionIndex() {
return this.connectionIndex;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
index 6f494eb..5a0fbb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
@@ -16,23 +16,13 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.io.network.api;
import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
-/**
- *
- */
public interface MutableReader<T extends IOReadableWritable> extends ReaderBase {
- /**
- * @param target
- * @return
- * @throws IOException
- * @throws InterruptedException
- */
boolean next(T target) throws IOException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae139f5a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
index 27658f1..1fecdbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.iterative.concurrent;
import java.util.concurrent.CountDownLatch;
/**
- * Resettable barrier to synchronize {@link IterationHeadPactTask} and {@link IterationTailPactTask} in case of
+ * Resettable barrier to synchronize the
+ * {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} and
+ * the {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask} in case of
* iterations that contain a separate solution set tail.
*/
public class SolutionSetUpdateBarrier {