You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/08 06:16:31 UTC
[flink] branch master updated: [FLINK-10073] [sql-client] Allow
setting a restart strategy in SQL Client
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 73bf45a [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client
73bf45a is described below
commit 73bf45ab4802ca396ce7e3a0393bb999642a3d4a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Aug 6 18:27:57 2018 +0200
[FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client
Adds support for fine-grained restart strategies per job/query.
This closes #6506.
---
docs/dev/table/sqlClient.md | 34 +++++++++++++++-
.../flink-sql-client/conf/sql-client-defaults.yaml | 6 +++
.../flink/table/client/config/Execution.java | 45 +++++++++++++++++++++-
.../flink/table/client/config/PropertyStrings.java | 18 +++++++++
.../client/gateway/local/ExecutionContext.java | 2 +
.../client/gateway/local/ExecutionContextTest.java | 11 ++++++
.../client/gateway/local/LocalExecutorITCase.java | 4 ++
.../test/resources/test-sql-client-defaults.yaml | 5 +++
8 files changed, 122 insertions(+), 3 deletions(-)
diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index d35aa59..0e8d2d6 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -108,6 +108,8 @@ Greg, 1
Both result modes can be useful during the prototyping of SQL queries.
+<span class="label label-danger">Attention</span> Queries that are executed in a batch environment, can only be retrieved using the `table` result mode.
+
After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. For this, a target system that stores the results needs to be specified using the [INSERT INTO statement](sqlClient.html#detached-sql-queries). The [configuration section](sqlClient.html#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties.
{% top %}
@@ -204,6 +206,8 @@ execution:
max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default)
min-idle-state-retention: 0 # optional: table program's minimum idle state time
max-idle-state-retention: 0 # optional: table program's maximum idle state time
+ restart-strategy: # optional: restart strategy
+ type: fallback # "fallback" to global restart strategy by default
# Deployment properties allow for describing the cluster to which table programs are submitted to.
@@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split into multiple files. The
CLI commands > session environment file > defaults environment file
{% endhighlight %}
-Queries that are executed in a batch environment, can only be retrieved using the `table` result mode.
+#### Restart Strategies
+
+Restart strategies control how Flink jobs are restarted in case of a failure. Similar to [global restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained restart configuration can be declared in an environment file.
+
+The following strategies are supported:
+
+{% highlight yaml %}
+execution:
+ # falls back to the global strategy defined in flink-conf.yaml
+ restart-strategy:
+ type: fallback
+
+ # job fails directly and no restart is attempted
+ restart-strategy:
+ type: none
+
+ # attempts a given number of times to restart the job
+ restart-strategy:
+ type: fixed-delay
+ attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE)
+ delay: 10000 # delay in ms between retries (default: 10 s)
+
+ # attempts as long as the maximum number of failures per time interval is not exceeded
+ restart-strategy:
+ type: failure-rate
+ max-failures-per-interval: 1 # retries in interval until failing (default: 1)
+ failure-rate-interval: 60000 # measuring interval in ms for failure rate
+ delay: 10000 # delay in ms between retries (default: 10 s)
+{% endhighlight %}
{% top %}
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 51e6e95..8be4ce6 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -74,6 +74,12 @@ execution:
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
+ # controls how table programs are restarted in case of a failures
+ restart-strategy:
+ # strategy type
+ # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
+ type: fallback
+
#==============================================================================
# Deployment properties
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 0d6e6dd..b7c2893 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -18,6 +18,8 @@
package org.apache.flink.table.client.config;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.TimeCharacteristic;
import java.util.Collections;
@@ -57,10 +59,10 @@ public class Execution {
}
public TimeCharacteristic getTimeCharacteristic() {
- final String s = properties.getOrDefault(
+ final String characteristic = properties.getOrDefault(
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME);
- switch (s) {
+ switch (characteristic) {
case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME:
return TimeCharacteristic.EventTime;
case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME:
@@ -90,6 +92,45 @@ public class Execution {
return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128)));
}
+ public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+ final String restartStrategy = properties.getOrDefault(
+ PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE,
+ PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK);
+ switch (restartStrategy) {
+ case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE:
+ return RestartStrategies.noRestart();
+ case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY:
+ final int attempts = Integer.parseInt(
+ properties.getOrDefault(
+ PropertyStrings.EXECUTION_RESTART_STRATEGY_ATTEMPTS,
+ Integer.toString(Integer.MAX_VALUE)));
+ final long fixedDelay = Long.parseLong(
+ properties.getOrDefault(
+ PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
+ Long.toString(10_000)));
+ return RestartStrategies.fixedDelayRestart(attempts, fixedDelay);
+ case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE:
+ final int failureRate = Integer.parseInt(
+ properties.getOrDefault(
+ PropertyStrings.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL,
+ Integer.toString(1)));
+ final long failureInterval = Long.parseLong(
+ properties.getOrDefault(
+ PropertyStrings.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL,
+ Long.toString(60_000)));
+ final long attemptDelay = Long.parseLong(
+ properties.getOrDefault(
+ PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
+ Long.toString(10_000)));
+ return RestartStrategies.failureRateRestart(
+ failureRate,
+ Time.milliseconds(failureInterval),
+ Time.milliseconds(attemptDelay));
+ default:
+ return RestartStrategies.fallBackRestart();
+ }
+ }
+
public boolean isChangelogMode() {
return Objects.equals(
properties.get(PropertyStrings.EXECUTION_RESULT_MODE),
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index 76e52de..2a6b001 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -57,6 +57,24 @@ public final class PropertyStrings {
public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+ public static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type";
+
+ public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
+
+ public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = "none";
+
+ public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay";
+
+ public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate";
+
+ public static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = "restart-strategy.attempts";
+
+ public static final String EXECUTION_RESTART_STRATEGY_DELAY = "restart-strategy.delay";
+
+ public static final String EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL = "restart-strategy.failure-rate-interval";
+
+ public static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval";
+
public static final String DEPLOYMENT = "deployment";
public static final String DEPLOYMENT_TYPE = "type";
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 4283953..9ff6837 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -361,12 +361,14 @@ public class ExecutionContext<T> {
private ExecutionEnvironment createExecutionEnvironment() {
final ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
execEnv.setParallelism(mergedEnv.getExecution().getParallelism());
return execEnv;
}
private StreamExecutionEnvironment createStreamExecutionEnvironment() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
env.setParallelism(mergedEnv.getExecution().getParallelism());
env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index bc29f6f..04575c6 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.configuration.Configuration;
@@ -41,6 +42,7 @@ import java.util.Map;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Test for {@link ExecutionContext}.
@@ -53,7 +55,16 @@ public class ExecutionContextTest {
public void testExecutionConfig() throws Exception {
final ExecutionContext<?> context = createExecutionContext();
final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig();
+
assertEquals(99, config.getAutoWatermarkInterval());
+
+ final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy();
+ assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration);
+ final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy =
+ (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
+ assertEquals(10, failureRateStrategy.getMaxFailureRate());
+ assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
+ assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
}
@Test
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index d8452e4..ed4ce7a 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -149,6 +149,10 @@ public class LocalExecutorITCase extends TestLogger {
expectedProperties.put("execution.max-idle-state-retention", "0");
expectedProperties.put("execution.min-idle-state-retention", "0");
expectedProperties.put("execution.result-mode", "table");
+ expectedProperties.put("execution.restart-strategy.type", "failure-rate");
+ expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10");
+ expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000");
+ expectedProperties.put("execution.restart-strategy.delay", "1000");
expectedProperties.put("deployment.response-timeout", "5000");
assertEquals(expectedProperties, actualProperties);
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index b759874..22351bc 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -117,6 +117,11 @@ execution:
min-idle-state-retention: 0
max-idle-state-retention: 0
result-mode: "$VAR_3"
+ restart-strategy:
+ type: failure-rate
+ max-failures-per-interval: 10
+ failure-rate-interval: 99000
+ delay: 1000
deployment:
response-timeout: 5000