You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/08/08 06:16:31 UTC

[flink] branch master updated: [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 73bf45a  [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client
73bf45a is described below

commit 73bf45ab4802ca396ce7e3a0393bb999642a3d4a
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Aug 6 18:27:57 2018 +0200

    [FLINK-10073] [sql-client] Allow setting a restart strategy in SQL Client
    
    Adds support for fine-grained restart strategies per job/query.
    
    This closes #6506.
---
 docs/dev/table/sqlClient.md                        | 34 +++++++++++++++-
 .../flink-sql-client/conf/sql-client-defaults.yaml |  6 +++
 .../flink/table/client/config/Execution.java       | 45 +++++++++++++++++++++-
 .../flink/table/client/config/PropertyStrings.java | 18 +++++++++
 .../client/gateway/local/ExecutionContext.java     |  2 +
 .../client/gateway/local/ExecutionContextTest.java | 11 ++++++
 .../client/gateway/local/LocalExecutorITCase.java  |  4 ++
 .../test/resources/test-sql-client-defaults.yaml   |  5 +++
 8 files changed, 122 insertions(+), 3 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index d35aa59..0e8d2d6 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -108,6 +108,8 @@ Greg, 1
 
 Both result modes can be useful during the prototyping of SQL queries.
 
+<span class="label label-danger">Attention</span> Queries that are executed in a batch environment, can only be retrieved using the `table` result mode.
+
 After a query is defined, it can be submitted to the cluster as a long-running, detached Flink job. For this, a target system that stores the results needs to be specified using the [INSERT INTO statement](sqlClient.html#detached-sql-queries). The [configuration section](sqlClient.html#configuration) explains how to declare table sources for reading data, how to declare table sinks for writing data, and how to configure other table program properties.
 
 {% top %}
@@ -204,6 +206,8 @@ execution:
   max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
   min-idle-state-retention: 0       # optional: table program's minimum idle state time
   max-idle-state-retention: 0       # optional: table program's maximum idle state time
+  restart-strategy:                 # optional: restart strategy
+    type: fallback                  #           "fallback" to global restart strategy by default
 
 # Deployment properties allow for describing the cluster to which table programs are submitted to.
 
@@ -227,7 +231,35 @@ Depending on the use case, a configuration can be split into multiple files. The
 CLI commands > session environment file > defaults environment file
 {% endhighlight %}
 
-Queries that are executed in a batch environment, can only be retrieved using the `table` result mode. 
+#### Restart Strategies
+
+Restart strategies control how Flink jobs are restarted in case of a failure. Similar to [global restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) for a Flink cluster, a more fine-grained restart configuration can be declared in an environment file.
+
+The following strategies are supported:
+
+{% highlight yaml %}
+execution:
+  # falls back to the global strategy defined in flink-conf.yaml
+  restart-strategy:
+    type: fallback
+
+  # job fails directly and no restart is attempted
+  restart-strategy:
+    type: none
+
+  # attempts a given number of times to restart the job
+  restart-strategy:
+    type: fixed-delay
+    attempts: 3      # retries before job is declared as failed (default: Integer.MAX_VALUE)
+    delay: 10000     # delay in ms between retries (default: 10 s)
+
+  # attempts as long as the maximum number of failures per time interval is not exceeded
+  restart-strategy:
+    type: failure-rate
+    max-failures-per-interval: 1   # retries in interval until failing (default: 1)
+    failure-rate-interval: 60000   # measuring interval in ms for failure rate
+    delay: 10000                   # delay in ms between retries (default: 10 s)
+{% endhighlight %}
 
 {% top %}
 
diff --git a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
index 51e6e95..8be4ce6 100644
--- a/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
@@ -74,6 +74,12 @@ execution:
   min-idle-state-retention: 0
   # maximum idle state retention in ms
   max-idle-state-retention: 0
+  # controls how table programs are restarted in case of a failures
+  restart-strategy:
+    # strategy type
+    # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
+    type: fallback
+
 
 #==============================================================================
 # Deployment properties
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 0d6e6dd..b7c2893 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.config;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 
 import java.util.Collections;
@@ -57,10 +59,10 @@ public class Execution {
 	}
 
 	public TimeCharacteristic getTimeCharacteristic() {
-		final String s = properties.getOrDefault(
+		final String characteristic = properties.getOrDefault(
 			PropertyStrings.EXECUTION_TIME_CHARACTERISTIC,
 			PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME);
-		switch (s) {
+		switch (characteristic) {
 			case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME:
 				return TimeCharacteristic.EventTime;
 			case PropertyStrings.EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME:
@@ -90,6 +92,45 @@ public class Execution {
 		return Integer.parseInt(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_PARALLELISM, Integer.toString(128)));
 	}
 
+	public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+		final String restartStrategy = properties.getOrDefault(
+			PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE,
+			PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK);
+		switch (restartStrategy) {
+			case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE:
+				return RestartStrategies.noRestart();
+			case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY:
+				final int attempts = Integer.parseInt(
+					properties.getOrDefault(
+						PropertyStrings.EXECUTION_RESTART_STRATEGY_ATTEMPTS,
+						Integer.toString(Integer.MAX_VALUE)));
+				final long fixedDelay = Long.parseLong(
+					properties.getOrDefault(
+						PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
+						Long.toString(10_000)));
+				return RestartStrategies.fixedDelayRestart(attempts, fixedDelay);
+			case PropertyStrings.EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE:
+				final int failureRate = Integer.parseInt(
+					properties.getOrDefault(
+						PropertyStrings.EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL,
+						Integer.toString(1)));
+				final long failureInterval = Long.parseLong(
+					properties.getOrDefault(
+						PropertyStrings.EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL,
+						Long.toString(60_000)));
+				final long attemptDelay = Long.parseLong(
+					properties.getOrDefault(
+						PropertyStrings.EXECUTION_RESTART_STRATEGY_DELAY,
+						Long.toString(10_000)));
+				return RestartStrategies.failureRateRestart(
+					failureRate,
+					Time.milliseconds(failureInterval),
+					Time.milliseconds(attemptDelay));
+			default:
+				return RestartStrategies.fallBackRestart();
+		}
+	}
+
 	public boolean isChangelogMode() {
 		return Objects.equals(
 			properties.get(PropertyStrings.EXECUTION_RESULT_MODE),
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
index 76e52de..2a6b001 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/PropertyStrings.java
@@ -57,6 +57,24 @@ public final class PropertyStrings {
 
 	public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
 
+	public static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type";
+
+	public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
+
+	public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = "none";
+
+	public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay";
+
+	public static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate";
+
+	public static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = "restart-strategy.attempts";
+
+	public static final String EXECUTION_RESTART_STRATEGY_DELAY = "restart-strategy.delay";
+
+	public static final String EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL = "restart-strategy.failure-rate-interval";
+
+	public static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = "restart-strategy.max-failures-per-interval";
+
 	public static final String DEPLOYMENT = "deployment";
 
 	public static final String DEPLOYMENT_TYPE = "type";
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 4283953..9ff6837 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -361,12 +361,14 @@ public class ExecutionContext<T> {
 
 		private ExecutionEnvironment createExecutionEnvironment() {
 			final ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
+			execEnv.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
 			execEnv.setParallelism(mergedEnv.getExecution().getParallelism());
 			return execEnv;
 		}
 
 		private StreamExecutionEnvironment createStreamExecutionEnvironment() {
 			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setRestartStrategy(mergedEnv.getExecution().getRestartStrategy());
 			env.setParallelism(mergedEnv.getExecution().getParallelism());
 			env.setMaxParallelism(mergedEnv.getExecution().getMaxParallelism());
 			env.setStreamTimeCharacteristic(mergedEnv.getExecution().getTimeCharacteristic());
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index bc29f6f..04575c6 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
@@ -41,6 +42,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test for {@link ExecutionContext}.
@@ -53,7 +55,16 @@ public class ExecutionContextTest {
 	public void testExecutionConfig() throws Exception {
 		final ExecutionContext<?> context = createExecutionContext();
 		final ExecutionConfig config = context.createEnvironmentInstance().getExecutionConfig();
+
 		assertEquals(99, config.getAutoWatermarkInterval());
+
+		final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy();
+		assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration);
+		final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy =
+			(RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
+		assertEquals(10, failureRateStrategy.getMaxFailureRate());
+		assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds());
+		assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
 	}
 
 	@Test
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index d8452e4..ed4ce7a 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -149,6 +149,10 @@ public class LocalExecutorITCase extends TestLogger {
 		expectedProperties.put("execution.max-idle-state-retention", "0");
 		expectedProperties.put("execution.min-idle-state-retention", "0");
 		expectedProperties.put("execution.result-mode", "table");
+		expectedProperties.put("execution.restart-strategy.type", "failure-rate");
+		expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10");
+		expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000");
+		expectedProperties.put("execution.restart-strategy.delay", "1000");
 		expectedProperties.put("deployment.response-timeout", "5000");
 
 		assertEquals(expectedProperties, actualProperties);
diff --git a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index b759874..22351bc 100644
--- a/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -117,6 +117,11 @@ execution:
   min-idle-state-retention: 0
   max-idle-state-retention: 0
   result-mode: "$VAR_3"
+  restart-strategy:
+    type: failure-rate
+    max-failures-per-interval: 10
+    failure-rate-interval: 99000
+    delay: 1000
 
 deployment:
   response-timeout: 5000