You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:08 UTC

[11/11] flink git commit: [hotfix] Fix checkstyle violations in ExecutionJobVertex

[hotfix] Fix checkstyle violations in ExecutionJobVertex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc5d29ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc5d29ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc5d29ea

Branch: refs/heads/master
Commit: cc5d29ea3c45848f79f20c21bce0b3e773da21eb
Parents: 88572dd
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:46:37 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../executiongraph/ExecutionJobVertex.java      | 44 ++++++++++----------
 1 file changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc5d29ea/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 0691cc7..6da1e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -317,7 +317,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	public ExecutionGraph getGraph() {
 		return graph;
 	}
-	
+
 	public JobVertex getJobVertex() {
 		return jobVertex;
 	}
@@ -344,33 +344,33 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	public JobID getJobId() {
 		return graph.getJobID();
 	}
-	
+
 	@Override
 	public JobVertexID getJobVertexId() {
 		return jobVertex.getID();
 	}
-	
+
 	@Override
 	public ExecutionVertex[] getTaskVertices() {
 		return taskVertices;
 	}
-	
+
 	public IntermediateResult[] getProducedDataSets() {
 		return producedDataSets;
 	}
-	
+
 	public InputSplitAssigner getSplitAssigner() {
 		return splitAssigner;
 	}
-	
+
 	public SlotSharingGroup getSlotSharingGroup() {
 		return slotSharingGroup;
 	}
-	
+
 	public CoLocationGroup getCoLocationGroup() {
 		return coLocationGroup;
 	}
-	
+
 	public List<IntermediateResult> getInputs() {
 		return inputs;
 	}
@@ -423,28 +423,28 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 
 	//---------------------------------------------------------------------------------------------
-	
+
 	public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
-		
+
 		List<JobEdge> inputs = jobVertex.getInputs();
-		
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
 		}
-		
+
 		for (int num = 0; num < inputs.size(); num++) {
 			JobEdge edge = inputs.get(num);
-			
+
 			if (LOG.isDebugEnabled()) {
 				if (edge.getSource() == null) {
-					LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", 
+					LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
 							num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
 				} else {
 					LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
 							num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
 				}
 			}
-			
+
 			// fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
 			// in which this method is called for the job vertices is not a topological order
 			IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
@@ -452,18 +452,18 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 				throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
 						+ edge.getSourceId());
 			}
-			
+
 			this.inputs.add(ires);
-			
+
 			int consumerIndex = ires.registerConsumer();
-			
+
 			for (int i = 0; i < parallelism; i++) {
 				ExecutionVertex ev = taskVertices[i];
 				ev.connectSource(num, ires, edge, consumerIndex);
 			}
 		}
 	}
-	
+
 	//---------------------------------------------------------------------------------------------
 	//  Actions
 	//---------------------------------------------------------------------------------------------
@@ -480,7 +480,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			SlotProvider slotProvider,
 			boolean queued,
 			LocationPreferenceConstraint locationPreferenceConstraint) {
-		
+
 		final ExecutionVertex[] vertices = this.taskVertices;
 
 		final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length);
@@ -497,9 +497,9 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
 	 * pairs of the slots and execution attempts, to ease correlation between vertices and execution
 	 * attempts.
-	 * 
+	 *
 	 * <p>If this method throws an exception, it makes sure to release all so far requested slots.
-	 * 
+	 *
 	 * @param resourceProvider The resource provider from whom the slots are requested.
 	 * @param queued if the allocation can be queued
 	 * @param locationPreferenceConstraint constraint for the location preferences