You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:06 UTC
[09/11] flink git commit: [hotfix] Fix checkstyle violations in
ExecutionVertex
[hotfix] Fix checkstyle violations in ExecutionVertex
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88572dd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88572dd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88572dd3
Branch: refs/heads/master
Commit: 88572dd3821cce33066e226a3d33bc8b073e2109
Parents: 3422ee8
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:43:44 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionVertex.java | 80 ++++++++++----------
1 file changed, 39 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/88572dd3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 8b57a7a..e385318 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -33,9 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -46,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -96,12 +96,12 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
private final Time timeout;
- /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+ /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
private final String taskNameWithSubtask;
private volatile CoLocationConstraint locationConstraint;
- /** The current or latest execution attempt of this vertex's task */
+ /** The current or latest execution attempt of this vertex's task. */
private volatile Execution currentExecution; // this field must never be null
// --------------------------------------------------------------------------------------------
@@ -117,17 +117,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
Time timeout) {
this(
- jobVertex,
- subTaskIndex,
- producedDataSets,
- timeout,
- 1L,
- System.currentTimeMillis(),
- JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
+ jobVertex,
+ subTaskIndex,
+ producedDataSets,
+ timeout,
+ 1L,
+ System.currentTimeMillis(),
+ JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
}
/**
- *
+ * Creates an ExecutionVertex.
+ *
* @param timeout
* The RPC timeout to use for deploy / cancel calls
* @param initialGlobalModVersion
@@ -311,7 +312,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
/**
* Gets the location where the latest completed/canceled/failed execution of the vertex's
* task happened.
- *
+ *
* @return The latest prior execution location, or null, if there is none, yet.
*/
public TaskManagerLocation getLatestPriorLocation() {
@@ -444,36 +445,36 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
/**
* Gets the overall preferred execution location for this vertex's current execution.
* The preference is determined as follows:
- *
+ *
* <ol>
* <li>If the task execution has state to load (from a checkpoint), then the location preference
* is the location of the previous execution (if there is a previous execution attempt).
* <li>If the task execution has no state or no previous location, then the location preference
* is based on the task's inputs.
* </ol>
- *
- * These rules should result in the following behavior:
- *
+ *
+ * <p>These rules should result in the following behavior:
+ *
* <ul>
* <li>Stateless tasks are always scheduled based on co-location with inputs.
* <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
* <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
* </ul>
- *
- * @return The preferred execution locations for the execution attempt.
- *
+ *
* @see #getPreferredLocationsBasedOnState()
- * @see #getPreferredLocationsBasedOnInputs()
+ * @see #getPreferredLocationsBasedOnInputs()
+ *
+ * @return The preferred execution locations for the execution attempt.
*/
public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState();
return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
}
-
+
/**
* Gets the preferred location to execute the current task execution attempt, based on the state
* that the execution attempt will resume.
- *
+ *
* @return A size-one collection with the location preference, or null, if there is no
* location preference based on the state.
*/
@@ -542,27 +543,25 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
/**
* Archives the current Execution and creates a new Execution for this vertex.
- *
+ *
* <p>This method atomically checks if the ExecutionGraph is still of an expected
* global mod. version and replaces the execution if that is the case. If the ExecutionGraph
* has increased its global mod. version in the meantime, this operation fails.
- *
+ *
* <p>This mechanism can be used to prevent conflicts between various concurrent recovery and
* reconfiguration actions in a similar way as "optimistic concurrency control".
- *
+ *
* @param timestamp
* The creation timestamp for the new Execution
* @param originatingGlobalModVersion
- * The
- *
- * @return Returns the new created Execution.
- *
+ *
+ * @return Returns the new created Execution.
+ *
* @throws GlobalModVersionMismatch Thrown, if the execution graph has a new global mod
* version than the one passed to this message.
*/
public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion)
- throws GlobalModVersionMismatch
- {
+ throws GlobalModVersionMismatch {
LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
synchronized (priorExecutions) {
@@ -642,12 +641,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
/**
- *
+ * Cancels this ExecutionVertex.
+ *
* @return A future that completes once the execution has reached its final state.
*/
public CompletableFuture<?> cancel() {
// to avoid any case of mixup in the presence of concurrent calls,
- // we copy a reference to the stack to make sure both calls go to the same Execution
+ // we copy a reference to the stack to make sure both calls go to the same Execution
final Execution exec = this.currentExecution;
exec.cancel();
return exec.getReleaseFuture();
@@ -742,7 +742,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// --------------------------------------------------------------------------------------------
/**
- * Simply forward this notification
+ * Simply forward this notification.
*/
void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
// only forward this notification if the execution is still the current execution
@@ -754,7 +754,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
/**
* Creates a task deployment descriptor to deploy a subtask to the given target slot.
- *
* TODO: This should actually be in the EXECUTION
*/
TaskDeploymentDescriptor createDeploymentDescriptor(
@@ -762,13 +761,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
LogicalSlot targetSlot,
@Nullable JobManagerTaskRestore taskRestore,
int attemptNumber) throws ExecutionGraphException {
-
+
// Produced intermediate results
List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size());
-
+
// Consumed intermediate results
List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length);
-
+
boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
for (IntermediateResultPartition partition : resultPartitions.values()) {
@@ -791,8 +790,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling));
}
}
-
-
+
for (ExecutionEdge[] edges : inputEdges) {
InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(
edges,