You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/08 02:53:24 UTC

[GitHub] TisonKun closed pull request #6353: [FLINK-9875][runtime] Add concurrent creation of execution job vertex

TisonKun closed pull request #6353: [FLINK-9875][runtime] Add concurrent creation of execution job vertex
URL: https://github.com/apache/flink/pull/6353
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..42440182293 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -88,6 +88,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -808,6 +809,55 @@ public Executor getFutureExecutor() {
 	//  Actions
 	// --------------------------------------------------------------------------------------------
 
+	private void createExecutionJobVertex(List<JobVertex> topologiallySorted) throws JobException {
+		final List<CompletableFuture<JobException>> futures = new LinkedList<>();
+		final long createTimestamp = System.currentTimeMillis();
+
+		for (JobVertex jobVertex: topologiallySorted) {
+			futures.add(CompletableFuture.supplyAsync(() -> {
+				try {
+					ExecutionJobVertex ejv = new ExecutionJobVertex(
+						this,
+						jobVertex,
+						1,
+						rpcTimeout,
+						globalModVersion,
+						createTimestamp);
+					ExecutionJobVertex previousTask = tasks.putIfAbsent(jobVertex.getID(), ejv);
+					if (previousTask != null) {
+						throw new JobException(
+							String.format(
+								"Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
+								jobVertex.getID(), ejv, previousTask));
+					}
+					return null;
+				} catch (JobException e) {
+					return e;
+				}
+			}, futureExecutor));
+		}
+
+		try {
+			// wait for all futures done
+			Collection<JobException> exceptions = FutureUtils.combineAll(futures).get();
+
+			// suppress all optional exceptions
+			Exception suppressedException = null;
+			for (Exception exception : exceptions) {
+				if (exception != null) {
+					suppressedException = ExceptionUtils.firstOrSuppressed(exception, suppressedException);
+				}
+			}
+
+			if (suppressedException != null) {
+				throw suppressedException;
+			}
+		} catch (Exception e) {
+			throw new JobException("Could not create execution job vertex.", e);
+		}
+	}
+
+
 	public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
 
 		LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
@@ -815,7 +865,8 @@ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobExcepti
 				topologiallySorted.size(), tasks.size(), intermediateResults.size());
 
 		final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
-		final long createTimestamp = System.currentTimeMillis();
+
+		createExecutionJobVertex(topologiallySorted);
 
 		for (JobVertex jobVertex : topologiallySorted) {
 
@@ -823,23 +874,10 @@ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobExcepti
 				this.isStoppable = false;
 			}
 
-			// create the execution job vertex and attach it to the graph
-			ExecutionJobVertex ejv = new ExecutionJobVertex(
-				this,
-				jobVertex,
-				1,
-				rpcTimeout,
-				globalModVersion,
-				createTimestamp);
+			ExecutionJobVertex ejv = tasks.get(jobVertex.getID());
 
 			ejv.connectToPredecessors(this.intermediateResults);
 
-			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
-			if (previousTask != null) {
-				throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
-						jobVertex.getID(), ejv, previousTask));
-			}
-
 			for (IntermediateResult res : ejv.getProducedDataSets()) {
 				IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
 				if (previousDataSet != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e385318b810..5d9a8e2f711 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -174,9 +174,11 @@ public ExecutionVertex(
 			timeout);
 
 		// create a co-location scheduling hint, if necessary
-		CoLocationGroup clg = jobVertex.getCoLocationGroup();
+		final CoLocationGroup clg = jobVertex.getCoLocationGroup();
 		if (clg != null) {
-			this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
+			synchronized (clg) {
+				this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
+			}
 		}
 		else {
 			this.locationConstraint = null;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services