You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:06 UTC

[09/11] flink git commit: [hotfix] Fix checkstyle violations in ExecutionVertex

[hotfix] Fix checkstyle violations in ExecutionVertex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88572dd3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88572dd3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88572dd3

Branch: refs/heads/master
Commit: 88572dd3821cce33066e226a3d33bc8b073e2109
Parents: 3422ee8
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:43:44 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java | 80 ++++++++++----------
 1 file changed, 39 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88572dd3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 8b57a7a..e385318 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -33,9 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -46,6 +44,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -96,12 +96,12 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	private final Time timeout;
 
-	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
 	private final String taskNameWithSubtask;
 
 	private volatile CoLocationConstraint locationConstraint;
 
-	/** The current or latest execution attempt of this vertex's task */
+	/** The current or latest execution attempt of this vertex's task. */
 	private volatile Execution currentExecution;	// this field must never be null
 
 	// --------------------------------------------------------------------------------------------
@@ -117,17 +117,18 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			Time timeout) {
 
 		this(
-				jobVertex,
-				subTaskIndex,
-				producedDataSets,
-				timeout,
-				1L,
-				System.currentTimeMillis(),
-				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
+			jobVertex,
+			subTaskIndex,
+			producedDataSets,
+			timeout,
+			1L,
+			System.currentTimeMillis(),
+			JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
 	}
 
 	/**
-	 * 
+	 * Creates an ExecutionVertex.
+	 *
 	 * @param timeout
 	 *            The RPC timeout to use for deploy / cancel calls
 	 * @param initialGlobalModVersion
@@ -311,7 +312,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	/**
 	 * Gets the location where the latest completed/canceled/failed execution of the vertex's
 	 * task happened.
-	 * 
+	 *
 	 * @return The latest prior execution location, or null, if there is none, yet.
 	 */
 	public TaskManagerLocation getLatestPriorLocation() {
@@ -444,36 +445,36 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	/**
 	 * Gets the overall preferred execution location for this vertex's current execution.
 	 * The preference is determined as follows:
-	 * 
+	 *
 	 * <ol>
 	 *     <li>If the task execution has state to load (from a checkpoint), then the location preference
 	 *         is the location of the previous execution (if there is a previous execution attempt).
 	 *     <li>If the task execution has no state or no previous location, then the location preference
 	 *         is based on the task's inputs.
 	 * </ol>
-	 * 
-	 * These rules should result in the following behavior:
-	 * 
+	 *
+	 * <p>These rules should result in the following behavior:
+	 *
 	 * <ul>
 	 *     <li>Stateless tasks are always scheduled based on co-location with inputs.
 	 *     <li>Stateful tasks are on their initial attempt executed based on co-location with inputs.
 	 *     <li>Repeated executions of stateful tasks try to co-locate the execution with its state.
 	 * </ul>
-	 * 
-	 * @return The preferred execution locations for the execution attempt.
-	 * 
+	 *
 	 * @see #getPreferredLocationsBasedOnState()
-	 * @see #getPreferredLocationsBasedOnInputs() 
+	 * @see #getPreferredLocationsBasedOnInputs()
+	 *
+	 * @return The preferred execution locations for the execution attempt.
 	 */
 	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
 		Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState();
 		return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
 	}
-	
+
 	/**
 	 * Gets the preferred location to execute the current task execution attempt, based on the state
 	 * that the execution attempt will resume.
-	 * 
+	 *
 	 * @return A size-one collection with the location preference, or null, if there is no
 	 *         location preference based on the state.
 	 */
@@ -542,27 +543,25 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	/**
 	 * Archives the current Execution and creates a new Execution for this vertex.
-	 * 
+	 *
 	 * <p>This method atomically checks if the ExecutionGraph is still of an expected
 	 * global mod. version and replaces the execution if that is the case. If the ExecutionGraph
 	 * has increased its global mod. version in the meantime, this operation fails.
-	 * 
+	 *
 	 * <p>This mechanism can be used to prevent conflicts between various concurrent recovery and
 	 * reconfiguration actions in a similar way as "optimistic concurrency control".
-	 * 
+	 *
 	 * @param timestamp
 	 *             The creation timestamp for the new Execution
 	 * @param originatingGlobalModVersion
-	 *             The 
-	 * 
-	 * @return Returns the new created Execution. 
-	 * 
+	 *
+	 * @return Returns the new created Execution.
+	 *
 	 * @throws GlobalModVersionMismatch Thrown, if the execution graph has a new global mod
 	 *                                  version than the one passed to this message.
 	 */
 	public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion)
-			throws GlobalModVersionMismatch
-	{
+			throws GlobalModVersionMismatch {
 		LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
 
 		synchronized (priorExecutions) {
@@ -642,12 +641,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	}
 
 	/**
-	 *  
+	 * Cancels this ExecutionVertex.
+	 *
 	 * @return A future that completes once the execution has reached its final state.
 	 */
 	public CompletableFuture<?> cancel() {
 		// to avoid any case of mixup in the presence of concurrent calls,
-		// we copy a reference to the stack to make sure both calls go to the same Execution 
+		// we copy a reference to the stack to make sure both calls go to the same Execution
 		final Execution exec = this.currentExecution;
 		exec.cancel();
 		return exec.getReleaseFuture();
@@ -742,7 +742,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Simply forward this notification
+	 * Simply forward this notification.
 	 */
 	void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
 		// only forward this notification if the execution is still the current execution
@@ -754,7 +754,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	/**
 	 * Creates a task deployment descriptor to deploy a subtask to the given target slot.
-	 *
 	 * TODO: This should actually be in the EXECUTION
 	 */
 	TaskDeploymentDescriptor createDeploymentDescriptor(
@@ -762,13 +761,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			LogicalSlot targetSlot,
 			@Nullable JobManagerTaskRestore taskRestore,
 			int attemptNumber) throws ExecutionGraphException {
-		
+
 		// Produced intermediate results
 		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size());
-		
+
 		// Consumed intermediate results
 		List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length);
-		
+
 		boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
 		for (IntermediateResultPartition partition : resultPartitions.values()) {
@@ -791,8 +790,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling));
 			}
 		}
-		
-		
+
 		for (ExecutionEdge[] edges : inputEdges) {
 			InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(
 				edges,