You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/03 11:59:32 UTC
[flink] branch master updated: [FLINK-12246][runtime] Read
MAX_ATTEMPTS_HISTORY_SIZE from cluster configuration
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9a34563 [FLINK-12246][runtime] Read MAX_ATTEMPTS_HISTORY_SIZE from cluster configuration
9a34563 is described below
commit 9a345633dab41906239f21bc2c27ad3c5b6f16df
Author: dcadmin <dc...@dcadmins-mac.local>
AuthorDate: Fri May 3 16:37:36 2019 +0800
[FLINK-12246][runtime] Read MAX_ATTEMPTS_HISTORY_SIZE from cluster configuration
This closes #8268.
---
.../runtime/executiongraph/ExecutionGraph.java | 46 +++++++++++++++++++---
.../executiongraph/ExecutionGraphBuilder.java | 5 +++
.../runtime/executiongraph/ExecutionJobVertex.java | 24 +++++------
3 files changed, 58 insertions(+), 17 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1b839d6..6a4af57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
@@ -255,6 +256,9 @@ public class ExecutionGraph implements AccessExecutionGraph {
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
+ /** The maximum number of prior execution attempts kept in history. */
+ private final int maxPriorAttemptsHistoryLength;
+
// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
private final AtomicInteger verticesFinished;
@@ -373,12 +377,39 @@ public class ExecutionGraph implements AccessExecutionGraph {
timeout);
}
+ @VisibleForTesting
+ public ExecutionGraph(
+ JobInformation jobInformation,
+ ScheduledExecutorService futureExecutor,
+ Executor ioExecutor,
+ Time timeout,
+ RestartStrategy restartStrategy,
+ FailoverStrategy.Factory failoverStrategy,
+ SlotProvider slotProvider,
+ ClassLoader userClassLoader,
+ BlobWriter blobWriter,
+ Time allocationTimeout) throws IOException {
+ this(
+ jobInformation,
+ futureExecutor,
+ ioExecutor,
+ timeout,
+ restartStrategy,
+ JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
+ failoverStrategy,
+ slotProvider,
+ userClassLoader,
+ blobWriter,
+ allocationTimeout);
+ }
+
public ExecutionGraph(
JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
Time rpcTimeout,
RestartStrategy restartStrategy,
+ int maxPriorAttemptsHistoryLength,
FailoverStrategy.Factory failoverStrategyFactory,
SlotProvider slotProvider,
ClassLoader userClassLoader,
@@ -423,6 +454,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
// is ready by the time the failover strategy sees it
this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
+ this.maxPriorAttemptsHistoryLength = maxPriorAttemptsHistoryLength;
+
this.schedulingFuture = null;
this.jobMasterMainThreadExecutor =
new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
@@ -825,12 +858,13 @@ public class ExecutionGraph implements AccessExecutionGraph {
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(
- this,
- jobVertex,
- 1,
- rpcTimeout,
- globalModVersion,
- createTimestamp);
+ this,
+ jobVertex,
+ 1,
+ maxPriorAttemptsHistoryLength,
+ rpcTimeout,
+ globalModVersion,
+ createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ac4a759..fc63e4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
@@ -110,6 +111,9 @@ public class ExecutionGraphBuilder {
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());
+ final int maxPriorAttemptsHistoryLength =
+ jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
+
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph;
try {
@@ -120,6 +124,7 @@ public class ExecutionGraphBuilder {
ioExecutor,
rpcTimeout,
restartStrategy,
+ maxPriorAttemptsHistoryLength,
failoverStrategy,
slotProvider,
classLoader,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6b1887c..9254ba2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -141,18 +140,26 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
*/
@VisibleForTesting
ExecutionJobVertex(
- ExecutionGraph graph,
- JobVertex jobVertex,
- int defaultParallelism,
- Time timeout) throws JobException {
+ ExecutionGraph graph,
+ JobVertex jobVertex,
+ int defaultParallelism,
+ Time timeout) throws JobException {
- this(graph, jobVertex, defaultParallelism, timeout, 1L, System.currentTimeMillis());
+ this(
+ graph,
+ jobVertex,
+ defaultParallelism,
+ JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue(),
+ timeout,
+ 1L,
+ System.currentTimeMillis());
}
public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
+ int maxPriorAttemptsHistoryLength,
Time timeout,
long initialGlobalModVersion,
long createTimestamp) throws JobException {
@@ -214,11 +221,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
result.getResultType());
}
- Configuration jobConfiguration = graph.getJobConfiguration();
- int maxPriorAttemptsHistoryLength = jobConfiguration != null ?
- jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :
- JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();
-
// create all task vertices
for (int i = 0; i < numTaskVertices; i++) {
ExecutionVertex vertex = new ExecutionVertex(