You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/06 21:29:17 UTC

[3/4] flink git commit: [FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region construction

[FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region construction

This method exploits the fact that verties are already in topological order.

This closes #3773


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfd37ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfd37ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfd37ca

Branch: refs/heads/master
Commit: dcfd37ca69380de5afbf28ad946dde90ab0de722
Parents: 166a3f8
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Apr 21 00:02:19 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat May 6 21:40:05 2017 +0200

----------------------------------------------------------------------
 .../RestartPipelinedRegionStrategy.java         | 170 +++++++++++++------
 1 file changed, 115 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcfd37ca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
index 0a5baa8..1884d1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java
@@ -24,14 +24,16 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -91,16 +93,20 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy {
 
 		if (failoverRegion == null) {
 			executionGraph.failGlobal(new FlinkException(
-					"Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex()));
+					"Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex(), cause));
 		}
 		else {
+			LOG.info("Recovering task failure for {} #{} ({}) via restart of failover region",
+					taskExecution.getVertex().getTaskNameWithSubtaskIndex(),
+					taskExecution.getAttemptNumber(),
+					taskExecution.getAttemptId());
+
 			failoverRegion.onExecutionFail(taskExecution, cause);
 		}
 	}
 
 	@Override
 	public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
-		LOG.debug("Generating failover regions for {} new job vertices", newJobVerticesTopological.size());
 		generateAllFailoverRegion(newJobVerticesTopological);
 	}
 
@@ -109,74 +115,128 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy {
 		return "Pipelined Region Failover";
 	}
 
-	// Generate all the FailoverRegion from the new added job vertexes
+	/**
+	 * Generate all the FailoverRegion from the new added job vertexes
+ 	 */
 	private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) {
+		final IdentityHashMap<ExecutionVertex, ArrayList<ExecutionVertex>> vertexToRegion = new IdentityHashMap<>();
+
+		// we use the map (list -> null) to imitate an IdentityHashSet (which does not exist)
+		final IdentityHashMap<ArrayList<ExecutionVertex>, Object> distinctRegions = new IdentityHashMap<>();
+
+		// this loop will worst case iterate over every edge in the graph (complexity is O(#edges))
+		
 		for (ExecutionJobVertex ejv : newJobVerticesTopological) {
-			for (ExecutionVertex ev : ejv.getTaskVertices()) {
-				if (getFailoverRegion(ev) != null) {
-					continue;
-				}
-				List<ExecutionVertex> pipelinedExecutions = new ArrayList<>();
-				List<ExecutionVertex> orgExecutions = new ArrayList<>();
-				orgExecutions.add(ev);
-				pipelinedExecutions.add(ev);
-				getAllPipelinedConnectedVertexes(orgExecutions, pipelinedExecutions);
-
-				FailoverRegion region = new FailoverRegion(executionGraph, executor, pipelinedExecutions);
-				for (ExecutionVertex vertex : pipelinedExecutions) {
-					vertexToRegion.put(vertex, region);
+
+			// currently, jobs with a co-location constraint fail as one
+			// we want to improve that in the future (or get rid of co-location constraints)
+			if (ejv.getCoLocationGroup() != null) {
+				makeAllOneRegion(newJobVerticesTopological);
+				return;
+			}
+
+			// see if this JobVertex one has pipelined inputs at all
+			final List<IntermediateResult> inputs = ejv.getInputs();
+			final int numInputs = inputs.size();
+			boolean hasPipelinedInputs = false;
+
+			for (IntermediateResult input : inputs) {
+				if (input.getResultType().isPipelined()) {
+					hasPipelinedInputs = true;
+					break;
 				}
 			}
-		}
-	}
 
-	/**
-	 * Get all connected executions of the original executions
-	 *
-	 * @param orgExecutions  the original execution vertexes
-	 * @param connectedExecutions  the total connected executions
-	 */
-	private static void getAllPipelinedConnectedVertexes(List<ExecutionVertex> orgExecutions, List<ExecutionVertex> connectedExecutions) {
-		List<ExecutionVertex> newAddedExecutions = new ArrayList<>();
-		for (ExecutionVertex ev : orgExecutions) {
-			// Add downstream ExecutionVertex
-			for (IntermediateResultPartition irp : ev.getProducedPartitions().values()) {
-				if (irp.getIntermediateResult().getResultType().isPipelined()) {
-					for (List<ExecutionEdge> consumers : irp.getConsumers()) {
-						for (ExecutionEdge consumer : consumers) {
-							ExecutionVertex cev = consumer.getTarget();
-							if (!connectedExecutions.contains(cev)) {
-								newAddedExecutions.add(cev);
+			if (hasPipelinedInputs) {
+				// build upon the predecessors
+				for (ExecutionVertex ev : ejv.getTaskVertices()) {
+
+					// remember the region in which we are
+					ArrayList<ExecutionVertex> thisRegion = null;
+
+					for (int inputNum = 0; inputNum < numInputs; inputNum++) {
+						if (inputs.get(inputNum).getResultType().isPipelined()) {
+
+							for (ExecutionEdge edge : ev.getInputEdges(inputNum)) {
+								final ExecutionVertex predecessor = edge.getSource().getProducer();
+								final ArrayList<ExecutionVertex> predecessorRegion = vertexToRegion.get(predecessor);
+
+								if (thisRegion != null) {
+									// we already have a region. see if it is the same as the predecessor's region
+									if (predecessorRegion != thisRegion) {
+
+										// we need to merge our region and the predecessor's region
+										thisRegion.addAll(predecessorRegion);
+										distinctRegions.remove(predecessorRegion);
+
+										// remap the vertices from that merged region
+										for (ExecutionVertex inPredRegion: predecessorRegion) {
+											vertexToRegion.put(inPredRegion, thisRegion);
+										}
+									}
+								}
+								else if (predecessor != null) {
+									// first case, make this our region
+									thisRegion = predecessorRegion;
+									thisRegion.add(ev);
+									vertexToRegion.put(ev, thisRegion);
+								}
+								else {
+									// throw an uncaught exception here
+									// this is a bug and not a recoverable situation
+									throw new FlinkRuntimeException(
+											"bug in the logic to construct the pipelined failover regions");
+								}
 							}
 						}
 					}
 				}
 			}
-			if (!newAddedExecutions.isEmpty()) {
-				connectedExecutions.addAll(newAddedExecutions);
-				getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
-				newAddedExecutions.clear();
-			}
-			// Add upstream ExecutionVertex
-			int inputNum = ev.getNumberOfInputs();
-			for (int i = 0; i < inputNum; i++) {
-				for (ExecutionEdge input : ev.getInputEdges(i)) {
-					if (input.getSource().getIntermediateResult().getResultType().isPipelined()) {
-						ExecutionVertex pev = input.getSource().getProducer();
-						if (!connectedExecutions.contains(pev)) {
-							newAddedExecutions.add(pev);
-						}
-					}
+			else {
+				// no pipelined inputs, start a new region
+				for (ExecutionVertex ev : ejv.getTaskVertices()) {
+					ArrayList<ExecutionVertex> region = new ArrayList<>(1);
+					region.add(ev);
+					vertexToRegion.put(ev, region);
+					distinctRegions.put(region, null);
 				}
 			}
-			if (!newAddedExecutions.isEmpty()) {
-				connectedExecutions.addAll(0, newAddedExecutions);
-				getAllPipelinedConnectedVertexes(newAddedExecutions, connectedExecutions);
-				newAddedExecutions.clear();
+		}
+
+		// now that we have all regions, create the failover region objects 
+		LOG.info("Creating {} individual failover regions for job {} ({})",
+				executionGraph.getJobName(), executionGraph.getJobID());
+
+		for (List<ExecutionVertex> region : distinctRegions.keySet()) {
+			final FailoverRegion failoverRegion = new FailoverRegion(executionGraph, executor, region);
+			for (ExecutionVertex ev : region) {
+				this.vertexToRegion.put(ev, failoverRegion);
 			}
 		}
 	}
 
+	private void makeAllOneRegion(List<ExecutionJobVertex> jobVertices) {
+		LOG.warn("Cannot decompose ExecutionGraph into individual failover regions due to use of " +
+				"Co-Location constraints (iterations). Job will fail over as one holistic unit.");
+
+		final ArrayList<ExecutionVertex> allVertices = new ArrayList<>();
+
+		for (ExecutionJobVertex ejv : jobVertices) {
+
+			// safe some incremental size growing
+			allVertices.ensureCapacity(allVertices.size() + ejv.getParallelism());
+
+			for (ExecutionVertex ev : ejv.getTaskVertices()) {
+				allVertices.add(ev);
+			}
+		}
+
+		final FailoverRegion singleRegion = new FailoverRegion(executionGraph, executor, allVertices);
+		for (ExecutionVertex ev : allVertices) {
+			vertexToRegion.put(ev, singleRegion);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  testing
 	// ------------------------------------------------------------------------