You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/14 05:52:31 UTC
[11/12] flink git commit: [FLINK-9143] Use cluster strategy if none
was set on client side
[FLINK-9143] Use cluster strategy if none was set on client side
Added NoOrFixedIfCheckpointingEnabledRestartStrategy
This closes #6283.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57872d53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57872d53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57872d53
Branch: refs/heads/master
Commit: 57872d53c4584faace6dc8e4038ad1f2d068a453
Parents: c9ad0a0
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Thu Jul 5 13:48:23 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 18:32:57 2018 +0200
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 10 +--
.../restartstrategy/RestartStrategies.java | 48 ++++++++++++-
.../runtime/webmonitor/WebFrontendITCase.java | 2 +-
...ckpointingEnabledRestartStrategyFactory.java | 42 ++++++++++++
.../restart/NoRestartStrategy.java | 4 +-
.../restart/RestartStrategyFactory.java | 63 ++++++++---------
.../restart/RestartStrategyResolving.java | 66 ++++++++++++++++++
.../apache/flink/runtime/jobgraph/JobGraph.java | 20 +++++-
.../flink/runtime/jobmaster/JobMaster.java | 16 +++--
.../flink/runtime/jobmanager/JobManager.scala | 20 +++---
.../checkpoint/CoordinatorShutdownTest.java | 19 ++++--
.../restart/RestartStrategyResolvingTest.java | 71 ++++++++++++++++++++
.../flink/runtime/jobmaster/JobMasterTest.java | 41 +++++++++++
.../TestingJobManagerSharedServicesBuilder.java | 4 +-
.../api/graph/StreamingJobGraphGenerator.java | 15 -----
.../streaming/api/RestartStrategyTest.java | 10 ++-
16 files changed, 367 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 7a0a574..59fa803 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.Preconditions;
import com.esotericsoftware.kryo.Serializer;
@@ -138,7 +139,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@Deprecated
private long executionRetryDelay = DEFAULT_RESTART_DELAY;
- private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
+ private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+ new RestartStrategies.FallbackRestartStrategyConfiguration();
private long taskCancellationIntervalMillis = -1;
@@ -390,7 +392,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
- this.restartStrategyConfiguration = restartStrategyConfiguration;
+ this.restartStrategyConfiguration = Preconditions.checkNotNull(restartStrategyConfiguration);
}
/**
@@ -401,14 +403,14 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@PublicEvolving
@SuppressWarnings("deprecation")
public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
- if (restartStrategyConfiguration == null) {
+ if (restartStrategyConfiguration instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
// support the old API calls by creating a restart strategy from them
if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) {
return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay());
} else if (getNumberOfExecutionRetries() == 0) {
return RestartStrategies.noRestart();
} else {
- return null;
+ return restartStrategyConfiguration;
}
} else {
return restartStrategyConfiguration;
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index f3eb3a5..4f67290 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.time.Time;
import java.io.Serializable;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@@ -106,6 +107,19 @@ public class RestartStrategies {
public String getDescription() {
return "Restart deactivated.";
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o instanceof NoRestartStrategyConfiguration;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash();
+ }
}
/**
@@ -188,6 +202,25 @@ public class RestartStrategies {
return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;
+ return maxFailureRate == that.maxFailureRate &&
+ Objects.equals(failureInterval, that.failureInterval) &&
+ Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);
+ }
}
/**
@@ -195,12 +228,25 @@ public class RestartStrategies {
* strategy. Useful especially when one has a custom implementation of restart strategy set via
* flink-conf.yaml.
*/
- public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{
+ public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {
private static final long serialVersionUID = -4441787204284085544L;
@Override
public String getDescription() {
return "Cluster level default restart strategy";
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ return o instanceof FallbackRestartStrategyConfiguration;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index fb8258a..b90277f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -313,7 +313,7 @@ public class WebFrontendITCase extends TestLogger {
assertEquals(HttpResponseStatus.OK, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," +
- "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
+ "\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\"," +
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
new file mode 100644
index 0000000..7b5c1a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * Default restart strategy that resolves either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy}
+ * depending if checkpointing was enabled.
+ */
+public class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends RestartStrategyFactory {
+ private static final long DEFAULT_RESTART_DELAY = 0;
+
+ private static final long serialVersionUID = -1809462525812787862L;
+
+ @Override
+ public RestartStrategy createRestartStrategy() {
+ return createRestartStrategy(false);
+ }
+
+ RestartStrategy createRestartStrategy(boolean isCheckpointingEnabled) {
+ if (isCheckpointingEnabled) {
+ return new FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY);
+ } else {
+ return new NoRestartStrategy();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 5502d2d..b639614 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -38,10 +38,10 @@ public class NoRestartStrategy implements RestartStrategy {
}
/**
- * Creates a NoRestartStrategy instance.
+ * Creates a NoRestartStrategyFactory instance.
*
* @param configuration Configuration object which is ignored
- * @return NoRestartStrategy instance
+ * @return NoRestartStrategyFactory instance
*/
public static NoRestartStrategyFactory createFactory(Configuration configuration) {
return new NoRestartStrategyFactory();
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 717e1d2..f15ee0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -82,38 +82,41 @@ public abstract class RestartStrategyFactory implements Serializable {
* @throws Exception which indicates that the RestartStrategy could not be instantiated.
*/
public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception {
- String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none");
+ String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, null);
+
+ if (restartStrategyName == null) {
+ // support deprecated ConfigConstants values
+ final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+ ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+ String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
+ String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+ pauseString);
+
+ long delay;
+
+ try {
+ delay = Duration.apply(delayString).toMillis();
+ } catch (NumberFormatException nfe) {
+ if (delayString.equals(pauseString)) {
+ throw new Exception("Invalid config value for " +
+ AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
+ ". Value must be a valid duration (such as '10 s' or '1 min')");
+ } else {
+ throw new Exception("Invalid config value for " +
+ ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
+ ". Value must be a valid duration (such as '100 milli' or '10 s')");
+ }
+ }
+
+ if (numberExecutionRetries > 0 && delay >= 0) {
+ return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
+ } else {
+ return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
+ }
+ }
switch (restartStrategyName.toLowerCase()) {
case "none":
- // support deprecated ConfigConstants values
- final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
- ConfigConstants.DEFAULT_EXECUTION_RETRIES);
- String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
- String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
- pauseString);
-
- long delay;
-
- try {
- delay = Duration.apply(delayString).toMillis();
- } catch (NumberFormatException nfe) {
- if (delayString.equals(pauseString)) {
- throw new Exception("Invalid config value for " +
- AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
- ". Value must be a valid duration (such as '10 s' or '1 min')");
- } else {
- throw new Exception("Invalid config value for " +
- ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
- ". Value must be a valid duration (such as '100 milli' or '10 s')");
- }
- }
-
- if (numberExecutionRetries > 0 && delay >= 0) {
- return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
- } else {
- return NoRestartStrategy.createFactory(configuration);
- }
case "off":
case "disable":
return NoRestartStrategy.createFactory(configuration);
@@ -149,7 +152,7 @@ public abstract class RestartStrategyFactory implements Serializable {
}
// fallback in case of an error
- return NoRestartStrategy.createFactory(configuration);
+ return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
new file mode 100644
index 0000000..ad7aa93
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+ /**
+ * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
+ * The resolving strategy is as follows:
+ * <ol>
+ * <li>Strategy set within job graph.</li>
+ * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing
+ * is enabled.</li>
+ * <li>If no strategy was set on client and server side and checkpointing was enabled then
+ * {@link FixedDelayRestartStrategy} is used</li>
+ * </ol>
+ *
+ * @param clientConfiguration restart configuration given within the job graph
+ * @param serverStrategyFactory default server side strategy factory
+ * @param isCheckpointingEnabled if checkpointing was enabled for the job
+ * @return resolved strategy
+ */
+ public static RestartStrategy resolve(
+ RestartStrategies.RestartStrategyConfiguration clientConfiguration,
+ RestartStrategyFactory serverStrategyFactory,
+ boolean isCheckpointingEnabled) {
+
+ final RestartStrategy clientSideRestartStrategy =
+ RestartStrategyFactory.createRestartStrategy(clientConfiguration);
+
+ if (clientSideRestartStrategy != null) {
+ return clientSideRestartStrategy;
+ } else {
+ if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
+ return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
+ .createRestartStrategy(isCheckpointingEnabled);
+ } else {
+ return serverStrategyFactory.createRestartStrategy();
+ }
+ }
+ }
+
+ private RestartStrategyResolving() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index b3e03de..377f870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -329,7 +329,7 @@ public class JobGraph implements Serializable {
* Sets the settings for asynchronous snapshots. A value of {@code null} means that
* snapshotting is not enabled.
*
- * @param settings The snapshot settings, or null, to disable snapshotting.
+ * @param settings The snapshot settings
*/
public void setSnapshotSettings(JobCheckpointingSettings settings) {
this.snapshotSettings = settings;
@@ -339,13 +339,29 @@ public class JobGraph implements Serializable {
* Gets the settings for asynchronous snapshots. This method returns null, when
* checkpointing is not enabled.
*
- * @return The snapshot settings, or null, if checkpointing is not enabled.
+ * @return The snapshot settings
*/
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
/**
+ * Checks if the checkpointing was enabled for this job graph
+ *
+ * @return true if checkpointing enabled
+ */
+ public boolean isCheckpointingEnabled() {
+
+ if (snapshotSettings == null) {
+ return false;
+ }
+
+ long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
+ return checkpointInterval > 0 &&
+ checkpointInterval < Long.MAX_VALUE;
+ }
+
+ /**
* Searches for a vertex with a matching ID and returns it.
*
* @param id
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7557bc3..1660f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobmaster;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
@@ -51,7 +52,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -273,11 +274,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
.deserializeValue(userCodeLoader)
.getRestartStrategy();
- this.restartStrategy = (restartStrategyConfiguration != null) ?
- RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
- jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();
+ this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
+ jobManagerSharedServices.getRestartStrategyFactory(),
+ jobGraph.isCheckpointingEnabled());
- log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
+ log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid);
resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
@@ -1649,4 +1650,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return CompletableFuture.completedFuture(null);
}
}
+
+ @VisibleForTesting
+ RestartStrategy getRestartStrategy() {
+ return restartStrategy;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cebff58..1c8174f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph._
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.{RestartStrategyFactory, RestartStrategyResolving}
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
@@ -1250,15 +1250,15 @@ class JobManager(
throw new JobSubmissionException(jobId, "The given job is empty")
}
- val restartStrategy =
- Option(jobGraph.getSerializedExecutionConfig()
- .deserializeValue(userCodeLoader)
- .getRestartStrategy())
- .map(RestartStrategyFactory.createRestartStrategy)
- .filter(p => p != null) match {
- case Some(strategy) => strategy
- case None => restartStrategyFactory.createRestartStrategy()
- }
+ val restartStrategyConfiguration = jobGraph
+ .getSerializedExecutionConfig
+ .deserializeValue(userCodeLoader)
+ .getRestartStrategy
+
+ val restartStrategy = RestartStrategyResolving
+ .resolve(restartStrategyConfiguration,
+ restartStrategyFactory,
+ jobGraph.isCheckpointingEnabled)
log.info(s"Using restart strategy $restartStrategy for $jobId.")
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 8a6a9d8..f6b7730 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -40,14 +42,14 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -68,7 +70,10 @@ public class CoordinatorShutdownTest extends TestLogger {
JobVertex vertex = new JobVertex("Test Vertex");
vertex.setInvokableClass(FailingBlockingInvokable.class);
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
-
+
+ final ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setRestartStrategy(RestartStrategies.noRestart());
+
JobGraph testGraph = new JobGraph("test job", vertex);
testGraph.setSnapshotSettings(
new JobCheckpointingSettings(
@@ -83,7 +88,9 @@ public class CoordinatorShutdownTest extends TestLogger {
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
-
+ testGraph.setExecutionConfig(executionConfig);
+
+
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
new file mode 100644
index 0000000..4194e97
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fallBackRestart;
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+/**
+ * Tests for {@link RestartStrategyResolving}.
+ */
+public class RestartStrategyResolvingTest extends TestLogger {
+
+ @Test
+ public void testClientSideHighestPriority() {
+ RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(noRestart(),
+ new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L),
+ true);
+
+ assertThat(resolvedStrategy, instanceOf(NoRestartStrategy.class));
+ }
+
+ @Test
+ public void testFixedStrategySetWhenCheckpointingEnabled() {
+ RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
+ new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(),
+ true);
+
+ assertThat(resolvedStrategy, instanceOf(FixedDelayRestartStrategy.class));
+ }
+
+ @Test
+ public void testServerStrategyIsUsedSetWhenCheckpointingEnabled() {
+ RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
+ new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)),
+ true);
+
+ assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class));
+ }
+
+ @Test
+ public void testServerStrategyIsUsedSetWhenCheckpointingDisabled() {
+ RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
+ new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)),
+ false);
+
+ assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d7dc017..82fdc94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -46,6 +46,9 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -110,6 +113,8 @@ import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* Tests for {@link JobMaster}.
@@ -358,6 +363,42 @@ public class JobMasterTest extends TestLogger {
}
/**
+ * Tests that in a streaming use case where checkpointing is enabled, a
+ * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
+ * strategy has been specified.
+ */
+ @Test
+ public void testAutomaticRestartingWhenCheckpointing() throws Exception {
+ // create savepoint data
+ final long savepointId = 42L;
+ final File savepointFile = createSavepoint(savepointId);
+
+ // set savepoint settings
+ final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(
+ savepointFile.getAbsolutePath(),
+ true);
+ final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+ final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+ final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
+ completedCheckpointStore,
+ new StandaloneCheckpointIDCounter());
+ haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+ final JobMaster jobMaster = createJobMaster(
+ new Configuration(),
+ jobGraph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder()
+ .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+ .build());
+
+ RestartStrategy restartStrategy = jobMaster.getRestartStrategy();
+
+ assertNotNull(restartStrategy);
+ assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
+ }
+
+ /**
* Tests that an existing checkpoint will have precedence over an savepoint
*/
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index f0b232a..030e4e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
@@ -48,7 +48,7 @@ public class TestingJobManagerSharedServicesBuilder {
public TestingJobManagerSharedServicesBuilder() {
scheduledExecutorService = TestingUtils.defaultExecutor();
libraryCacheManager = mock(LibraryCacheManager.class);
- restartStrategyFactory = new NoRestartStrategy.NoRestartStrategyFactory();
+ restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class);
backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 603b9e4..e905eac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -81,12 +80,6 @@ public class StreamingJobGraphGenerator {
private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
- /**
- * Restart delay used for the FixedDelayRestartStrategy in case checkpointing was enabled but
- * no restart strategy has been specified.
- */
- private static final long DEFAULT_RESTART_DELAY = 0L;
-
// ------------------------------------------------------------------------
public static JobGraph createJobGraph(StreamGraph streamGraph) {
@@ -590,17 +583,9 @@ public class StreamingJobGraphGenerator {
long interval = cfg.getCheckpointInterval();
if (interval > 0) {
-
ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// propagate the expected behaviour for checkpoint errors to task.
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-
- // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
- if (executionConfig.getRestartStrategy() == null) {
- // if the user enabled checkpointing, the default number of exec retries is infinite.
- executionConfig.setRestartStrategy(
- RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
- }
} else {
// interval of max value means disable periodic checkpoint
interval = Long.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index b231bea..03b5a53 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -33,12 +33,11 @@ import org.junit.Test;
public class RestartStrategyTest extends TestLogger {
/**
- * Tests that in a streaming use case where checkpointing is enabled, a
- * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
- * strategy has been specified.
+ * Tests that in a streaming use case where checkpointing is enabled, there is no default strategy set on the
+ * client side.
*/
@Test
- public void testAutomaticRestartingWhenCheckpointing() throws Exception {
+ public void testFallbackStrategyOnClientSideWhenCheckpointingEnabled() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
@@ -51,8 +50,7 @@ public class RestartStrategyTest extends TestLogger {
jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
Assert.assertNotNull(restartStrategy);
- Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
- Assert.assertEquals(Integer.MAX_VALUE, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts());
+ Assert.assertTrue(restartStrategy instanceof RestartStrategies.FallbackRestartStrategyConfiguration);
}
/**