You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/03 11:59:32 UTC

[flink] branch master updated: [FLINK-12246][runtime] Read MAX_ATTEMPTS_HISTORY_SIZE from cluster configuration

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a34563  [FLINK-12246][runtime] Read MAX_ATTEMPTS_HISTORY_SIZE from cluster configuration
9a34563 is described below

commit 9a345633dab41906239f21bc2c27ad3c5b6f16df
Author: dcadmin <dc...@dcadmins-mac.local>
AuthorDate: Fri May 3 16:37:36 2019 +0800

    [FLINK-12246][runtime] Read MAX_ATTEMPTS_HISTORY_SIZE from cluster configuration
    
    This closes #8268.
---
 .../runtime/executiongraph/ExecutionGraph.java     | 46 +++++++++++++++++++---
 .../executiongraph/ExecutionGraphBuilder.java      |  5 +++
 .../runtime/executiongraph/ExecutionJobVertex.java | 24 +++++------
 3 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1b839d6..6a4af57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -255,6 +256,9 @@ public class ExecutionGraph implements AccessExecutionGraph {
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
+	/** The maximum number of prior execution attempts kept in history. */
+	private final int maxPriorAttemptsHistoryLength;
+
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	private final AtomicInteger verticesFinished;
@@ -373,12 +377,39 @@ public class ExecutionGraph implements AccessExecutionGraph {
 			timeout);
 	}
 
+	@VisibleForTesting
+	public ExecutionGraph(
+			JobInformation jobInformation,
+			ScheduledExecutorService futureExecutor,
+			Executor ioExecutor,
+			Time timeout,
+			RestartStrategy restartStrategy,
+			FailoverStrategy.Factory failoverStrategy,
+			SlotProvider slotProvider,
+			ClassLoader userClassLoader,
+			BlobWriter blobWriter,
+			Time allocationTimeout) throws IOException {
+		this(
+			jobInformation,
+			futureExecutor,
+			ioExecutor,
+			timeout,
+			restartStrategy,
+			JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
+			failoverStrategy,
+			slotProvider,
+			userClassLoader,
+			blobWriter,
+			allocationTimeout);
+	}
+
 	public ExecutionGraph(
 			JobInformation jobInformation,
 			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
 			Time rpcTimeout,
 			RestartStrategy restartStrategy,
+			int maxPriorAttemptsHistoryLength,
 			FailoverStrategy.Factory failoverStrategyFactory,
 			SlotProvider slotProvider,
 			ClassLoader userClassLoader,
@@ -423,6 +454,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
 		// is ready by the time the failover strategy sees it
 		this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
 
+		this.maxPriorAttemptsHistoryLength = maxPriorAttemptsHistoryLength;
+
 		this.schedulingFuture = null;
 		this.jobMasterMainThreadExecutor =
 			new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
@@ -825,12 +858,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
 
 			// create the execution job vertex and attach it to the graph
 			ExecutionJobVertex ejv = new ExecutionJobVertex(
-				this,
-				jobVertex,
-				1,
-				rpcTimeout,
-				globalModVersion,
-				createTimestamp);
+					this,
+					jobVertex,
+					1,
+					maxPriorAttemptsHistoryLength,
+					rpcTimeout,
+					globalModVersion,
+					createTimestamp);
 
 			ejv.connectToPredecessors(this.intermediateResults);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ac4a759..fc63e4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
@@ -110,6 +111,9 @@ public class ExecutionGraphBuilder {
 			jobGraph.getUserJarBlobKeys(),
 			jobGraph.getClasspaths());
 
+		final int maxPriorAttemptsHistoryLength =
+				jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
+
 		// create a new execution graph, if none exists so far
 		final ExecutionGraph executionGraph;
 		try {
@@ -120,6 +124,7 @@ public class ExecutionGraphBuilder {
 					ioExecutor,
 					rpcTimeout,
 					restartStrategy,
+					maxPriorAttemptsHistoryLength,
 					failoverStrategy,
 					slotProvider,
 					classLoader,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6b1887c..9254ba2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -141,18 +140,26 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 */
 	@VisibleForTesting
 	ExecutionJobVertex(
-		ExecutionGraph graph,
-		JobVertex jobVertex,
-		int defaultParallelism,
-		Time timeout) throws JobException {
+			ExecutionGraph graph,
+			JobVertex jobVertex,
+			int defaultParallelism,
+			Time timeout) throws JobException {
 
-		this(graph, jobVertex, defaultParallelism, timeout, 1L, System.currentTimeMillis());
+		this(
+			graph,
+			jobVertex,
+			defaultParallelism,
+			JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
+			timeout,
+			1L,
+			System.currentTimeMillis());
 	}
 
 	public ExecutionJobVertex(
 			ExecutionGraph graph,
 			JobVertex jobVertex,
 			int defaultParallelism,
+			int maxPriorAttemptsHistoryLength,
 			Time timeout,
 			long initialGlobalModVersion,
 			long createTimestamp) throws JobException {
@@ -214,11 +221,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 					result.getResultType());
 		}
 
-		Configuration jobConfiguration = graph.getJobConfiguration();
-		int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
-				jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
-				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
-
 		// create all task vertices
 		for (int i = 0; i < numTaskVertices; i++) {
 			ExecutionVertex vertex = new ExecutionVertex(