You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/22 14:16:44 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-28166] Configurable Automatic Retries on Error
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 39d9d10 [FLINK-28166] Configurable Automatic Retries on Error
39d9d10 is described below
commit 39d9d105b0f77298252e1ff917a2b5e30538ee9d
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Tue Jun 21 11:28:51 2022 +0200
[FLINK-28166] Configurable Automatic Retries on Error
---
.../kubernetes_operator_config_configuration.html | 22 +++++-
.../flink/kubernetes/operator/FlinkOperator.java | 4 +
.../config/FlinkOperatorConfiguration.java | 47 +++++++++++-
.../config/KubernetesOperatorConfigOptions.java | 25 ++++++-
.../FlinkOperatorRetryConfigurationTest.java | 85 ++++++++++++++++++++++
.../flink-kubernetes-operator/conf/flink-conf.yaml | 5 ++
6 files changed, 182 insertions(+), 6 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 1bfc9de..beffac6 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -48,7 +48,7 @@
<td><h5>kubernetes.operator.dynamic.namespaces.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>Enables dynamic change of watched/monitored namespaces. Defaults to false</td>
+ <td>Enables dynamic change of watched/monitored namespaces.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
@@ -110,6 +110,24 @@
<td>Duration</td>
<td>The interval for the controller to reschedule the reconcile process.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.retry.initial.interval</h5></td>
+ <td style="word-wrap: break-word;">5 s</td>
+ <td>Duration</td>
+ <td>Initial interval of automatic reconcile retries on recoverable errors.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.retry.interval.multiplier</h5></td>
+ <td style="word-wrap: break-word;">2.0</td>
+ <td>Double</td>
+ <td>Interval multiplier of automatic reconcile retries on recoverable errors.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.retry.max.attempts</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>Max attempts of automatic reconcile retries on recoverable errors.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
<td style="word-wrap: break-word;">86400000 ms</td>
@@ -156,7 +174,7 @@
<td><h5>kubernetes.operator.watched.namespaces</h5></td>
<td style="word-wrap: break-word;">"JOSDK_ALL_NAMESPACES"</td>
<td>String</td>
- <td>Comma separated list of namespaces the operator monitors for custom resources. Defaults to JOSDK_ALL_NAMESPACES</td>
+ <td>Comma separated list of namespaces the operator monitors for custom resources.</td>
</tr>
</tbody>
</table>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index cfbb450..38ed58f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -48,6 +48,7 @@ import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
+import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,6 +161,9 @@ public class FlinkOperator {
overrider.settingNamespace(fakeNs);
overrider.addingNamespaces(watchedNamespaces);
overrider.removingNamespaces(fakeNs);
+ overrider.withRetry(
+ GenericRetry.fromConfiguration(
+ configManager.getOperatorConfiguration().getRetryConfiguration()));
}
public void run() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index c9b1f7e..ea96278 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
import lombok.Value;
import java.time.Duration;
@@ -47,6 +48,7 @@ public class FlinkOperatorConfiguration {
String artifactsBaseDir;
Integer savepointHistoryCountThreshold;
Duration savepointHistoryAgeThreshold;
+ RetryConfiguration retryConfiguration;
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
@@ -111,6 +113,8 @@ public class FlinkOperatorConfiguration {
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);
+ RetryConfiguration retryConfiguration = new FlinkOperatorRetryConfiguration(operatorConfig);
+
return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
@@ -124,6 +128,47 @@ public class FlinkOperatorConfiguration {
flinkShutdownClusterTimeout,
artifactsBaseDir,
savepointHistoryCountThreshold,
- savepointHistoryAgeThreshold);
+ savepointHistoryAgeThreshold,
+ retryConfiguration);
+ }
+
+ /** Enables configurable retry mechanism for reconciliation errors. */
+ protected static class FlinkOperatorRetryConfiguration implements RetryConfiguration {
+ private final int maxAttempts;
+ private final long initialInterval;
+ private final double intervalMultiplier;
+
+ public FlinkOperatorRetryConfiguration(Configuration operatorConfig) {
+ maxAttempts =
+ operatorConfig.getInteger(
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_MAX_ATTEMPTS);
+ initialInterval =
+ operatorConfig
+ .get(KubernetesOperatorConfigOptions.OPERATOR_RETRY_INITIAL_INTERVAL)
+ .toMillis();
+ intervalMultiplier =
+ operatorConfig.getDouble(
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_INTERVAL_MULTIPLIER);
+ }
+
+ @Override
+ public int getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ @Override
+ public long getInitialInterval() {
+ return initialInterval;
+ }
+
+ @Override
+ public double getIntervalMultiplier() {
+ return intervalMultiplier;
+ }
+
+ @Override
+ public long getMaxInterval() {
+ return (long) (initialInterval * Math.pow(intervalMultiplier, maxAttempts));
+ }
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 5ad1655..13a0f4c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -194,13 +194,32 @@ public class KubernetesOperatorConfigOptions {
.stringType()
.defaultValue(Constants.WATCH_ALL_NAMESPACES)
.withDescription(
- "Comma separated list of namespaces the operator monitors for custom resources. Defaults to "
- + Constants.WATCH_ALL_NAMESPACES);
+ "Comma separated list of namespaces the operator monitors for custom resources.");
public static final ConfigOption<Boolean> OPERATOR_DYNAMIC_NAMESPACES_ENABLED =
ConfigOptions.key("kubernetes.operator.dynamic.namespaces.enabled")
.booleanType()
.defaultValue(false)
+ .withDescription("Enables dynamic change of watched/monitored namespaces.");
+
+ public static final ConfigOption<Duration> OPERATOR_RETRY_INITIAL_INTERVAL =
+ ConfigOptions.key("kubernetes.operator.retry.initial.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(5))
+ .withDescription(
+ "Initial interval of automatic reconcile retries on recoverable errors.");
+
+ public static final ConfigOption<Double> OPERATOR_RETRY_INTERVAL_MULTIPLIER =
+ ConfigOptions.key("kubernetes.operator.retry.interval.multiplier")
+ .doubleType()
+ .defaultValue(2.0)
+ .withDescription(
+ "Interval multiplier of automatic reconcile retries on recoverable errors.");
+
+ public static final ConfigOption<Integer> OPERATOR_RETRY_MAX_ATTEMPTS =
+ ConfigOptions.key("kubernetes.operator.retry.max.attempts")
+ .intType()
+ .defaultValue(10)
.withDescription(
- "Enables dynamic change of watched/monitored namespaces. Defaults to false");
+ "Max attempts of automatic reconcile retries on recoverable errors.");
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorRetryConfigurationTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorRetryConfigurationTest.java
new file mode 100644
index 0000000..252c1c3
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorRetryConfigurationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+/** {@link FlinkOperatorConfiguration.FlinkOperatorRetryConfiguration} tests. */
+public class FlinkOperatorRetryConfigurationTest {
+
+ @Test
+ public void testRetryConfiguration() {
+
+ // default values
+ FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+ Assertions.assertEquals(
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_INITIAL_INTERVAL
+ .defaultValue()
+ .toMillis(),
+ configManager
+ .getOperatorConfiguration()
+ .getRetryConfiguration()
+ .getInitialInterval());
+ Assertions.assertEquals(
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_INTERVAL_MULTIPLIER.defaultValue(),
+ configManager
+ .getOperatorConfiguration()
+ .getRetryConfiguration()
+ .getIntervalMultiplier());
+ Assertions.assertEquals(
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_MAX_ATTEMPTS.defaultValue(),
+ configManager.getOperatorConfiguration().getRetryConfiguration().getMaxAttempts());
+
+ // overrides
+ var overrides =
+ Configuration.fromMap(
+ Map.of(
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_INITIAL_INTERVAL
+ .key(),
+ "1 s",
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_INTERVAL_MULTIPLIER
+ .key(),
+ "2.0",
+ KubernetesOperatorConfigOptions.OPERATOR_RETRY_MAX_ATTEMPTS.key(),
+ "3"));
+ configManager.updateDefaultConfig(overrides);
+ Assertions.assertEquals(
+ 1000L,
+ configManager
+ .getOperatorConfiguration()
+ .getRetryConfiguration()
+ .getInitialInterval());
+ Assertions.assertEquals(
+ 2.0,
+ configManager
+ .getOperatorConfiguration()
+ .getRetryConfiguration()
+ .getIntervalMultiplier());
+ Assertions.assertEquals(
+ 3,
+ configManager.getOperatorConfiguration().getRetryConfiguration().getMaxAttempts());
+ Assertions.assertEquals(
+ 8000L,
+ configManager.getOperatorConfiguration().getRetryConfiguration().getMaxInterval());
+ }
+}
diff --git a/helm/flink-kubernetes-operator/conf/flink-conf.yaml b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
index 8aa5835..ccc172b 100644
--- a/helm/flink-kubernetes-operator/conf/flink-conf.yaml
+++ b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
@@ -39,6 +39,11 @@ parallelism.default: 2
# kubernetes.operator.deployment.readiness.timeout: 1min
# kubernetes.operator.user.artifacts.base.dir: /opt/flink/artifacts
# kubernetes.operator.job.upgrade.ignore-pending-savepoint: false
+# kubernetes.operator.watched.namespaces: ns1,ns2
+# kubernetes.operator.dynamic.namespaces.enabled: false
+# kubernetes.operator.retry.initial.interval: 5 s
+# kubernetes.operator.retry.interval.multiplier: 2
+# kubernetes.operator.retry.max.attempts: 10
# kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
# kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE