You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:08 UTC
[11/11] flink git commit: [hotfix] Fix checkstyle violations in
ExecutionJobVertex
[hotfix] Fix checkstyle violations in ExecutionJobVertex
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc5d29ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc5d29ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc5d29ea
Branch: refs/heads/master
Commit: cc5d29ea3c45848f79f20c21bce0b3e773da21eb
Parents: 88572dd
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:46:37 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200
----------------------------------------------------------------------
.../executiongraph/ExecutionJobVertex.java | 44 ++++++++++----------
1 file changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc5d29ea/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 0691cc7..6da1e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -317,7 +317,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
public ExecutionGraph getGraph() {
return graph;
}
-
+
public JobVertex getJobVertex() {
return jobVertex;
}
@@ -344,33 +344,33 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
public JobID getJobId() {
return graph.getJobID();
}
-
+
@Override
public JobVertexID getJobVertexId() {
return jobVertex.getID();
}
-
+
@Override
public ExecutionVertex[] getTaskVertices() {
return taskVertices;
}
-
+
public IntermediateResult[] getProducedDataSets() {
return producedDataSets;
}
-
+
public InputSplitAssigner getSplitAssigner() {
return splitAssigner;
}
-
+
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
-
+
public CoLocationGroup getCoLocationGroup() {
return coLocationGroup;
}
-
+
public List<IntermediateResult> getInputs() {
return inputs;
}
@@ -423,28 +423,28 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
//---------------------------------------------------------------------------------------------
-
+
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
-
+
List<JobEdge> inputs = jobVertex.getInputs();
-
+
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
}
-
+
for (int num = 0; num < inputs.size(); num++) {
JobEdge edge = inputs.get(num);
-
+
if (LOG.isDebugEnabled()) {
if (edge.getSource() == null) {
- LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
+ LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
} else {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
}
}
-
+
// fetch the intermediate result via ID. if it does not exist, then it either has not been created, or the order
// in which this method is called for the job vertices is not a topological order
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
@@ -452,18 +452,18 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
+ edge.getSourceId());
}
-
+
this.inputs.add(ires);
-
+
int consumerIndex = ires.registerConsumer();
-
+
for (int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i];
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
-
+
//---------------------------------------------------------------------------------------------
// Actions
//---------------------------------------------------------------------------------------------
@@ -480,7 +480,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {
-
+
final ExecutionVertex[] vertices = this.taskVertices;
final ArrayList<CompletableFuture<Void>> scheduleFutures = new ArrayList<>(vertices.length);
@@ -497,9 +497,9 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
* Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns
* pairs of the slots and execution attempts, to ease correlation between vertices and execution
* attempts.
- *
+ *
* <p>If this method throws an exception, it makes sure to release all so far requested slots.
- *
+ *
* @param resourceProvider The resource provider from whom the slots are requested.
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences