You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/22 14:16:44 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-28166] Configurable Automatic Retries on Error

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 39d9d10  [FLINK-28166] Configurable Automatic Retries on Error
39d9d10 is described below

commit 39d9d105b0f77298252e1ff917a2b5e30538ee9d
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Tue Jun 21 11:28:51 2022 +0200

    [FLINK-28166] Configurable Automatic Retries on Error
---
 .../kubernetes_operator_config_configuration.html  | 22 +++++-
 .../flink/kubernetes/operator/FlinkOperator.java   |  4 +
 .../config/FlinkOperatorConfiguration.java         | 47 +++++++++++-
 .../config/KubernetesOperatorConfigOptions.java    | 25 ++++++-
 .../FlinkOperatorRetryConfigurationTest.java       | 85 ++++++++++++++++++++++
 .../flink-kubernetes-operator/conf/flink-conf.yaml |  5 ++
 6 files changed, 182 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 1bfc9de..beffac6 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -48,7 +48,7 @@
             <td><h5>kubernetes.operator.dynamic.namespaces.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Enables dynamic change of watched/monitored namespaces. Defaults to false</td>
+            <td>Enables dynamic change of watched/monitored namespaces.</td>
         </tr>
         <tr>
             <td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
@@ -110,6 +110,24 @@
             <td>Duration</td>
             <td>The interval for the controller to reschedule the reconcile process.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.retry.initial.interval</h5></td>
+            <td style="word-wrap: break-word;">5 s</td>
+            <td>Duration</td>
+            <td>Initial interval of automatic reconcile retries on recoverable errors.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.retry.interval.multiplier</h5></td>
+            <td style="word-wrap: break-word;">2.0</td>
+            <td>Double</td>
+            <td>Interval multiplier of automatic reconcile retries on recoverable errors.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.retry.max.attempts</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>Max attempts of automatic reconcile retries on recoverable errors.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.savepoint.history.max.age</h5></td>
             <td style="word-wrap: break-word;">86400000 ms</td>
@@ -156,7 +174,7 @@
             <td><h5>kubernetes.operator.watched.namespaces</h5></td>
             <td style="word-wrap: break-word;">"JOSDK_ALL_NAMESPACES"</td>
             <td>String</td>
-            <td>Comma separated list of namespaces the operator monitors for custom resources. Defaults to JOSDK_ALL_NAMESPACES</td>
+            <td>Comma separated list of namespaces the operator monitors for custom resources.</td>
         </tr>
     </tbody>
 </table>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index cfbb450..38ed58f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -48,6 +48,7 @@ import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.RegisteredController;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
 import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
+import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -160,6 +161,9 @@ public class FlinkOperator {
         overrider.settingNamespace(fakeNs);
         overrider.addingNamespaces(watchedNamespaces);
         overrider.removingNamespaces(fakeNs);
+        overrider.withRetry(
+                GenericRetry.fromConfiguration(
+                        configManager.getOperatorConfiguration().getRetryConfiguration()));
     }
 
     public void run() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index c9b1f7e..ea96278 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.operator.config;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 
+import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
 import lombok.Value;
 
 import java.time.Duration;
@@ -47,6 +48,7 @@ public class FlinkOperatorConfiguration {
     String artifactsBaseDir;
     Integer savepointHistoryCountThreshold;
     Duration savepointHistoryAgeThreshold;
+    RetryConfiguration retryConfiguration;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
         Duration reconcileInterval =
@@ -111,6 +113,8 @@ public class FlinkOperatorConfiguration {
                 operatorConfig.get(
                         KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);
 
+        RetryConfiguration retryConfiguration = new FlinkOperatorRetryConfiguration(operatorConfig);
+
         return new FlinkOperatorConfiguration(
                 reconcileInterval,
                 reconcilerMaxParallelism,
@@ -124,6 +128,47 @@ public class FlinkOperatorConfiguration {
                 flinkShutdownClusterTimeout,
                 artifactsBaseDir,
                 savepointHistoryCountThreshold,
-                savepointHistoryAgeThreshold);
+                savepointHistoryAgeThreshold,
+                retryConfiguration);
+    }
+
+    /** Enables configurable retry mechanism for reconciliation errors. */
+    protected static class FlinkOperatorRetryConfiguration implements RetryConfiguration {
+        private final int maxAttempts;
+        private final long initialInterval;
+        private final double intervalMultiplier;
+
+        public FlinkOperatorRetryConfiguration(Configuration operatorConfig) {
+            maxAttempts =
+                    operatorConfig.getInteger(
+                            KubernetesOperatorConfigOptions.OPERATOR_RETRY_MAX_ATTEMPTS);
+            initialInterval =
+                    operatorConfig
+                            .get(KubernetesOperatorConfigOptions.OPERATOR_RETRY_INITIAL_INTERVAL)
+                            .toMillis();
+            intervalMultiplier =
+                    operatorConfig.getDouble(
+                            KubernetesOperatorConfigOptions.OPERATOR_RETRY_INTERVAL_MULTIPLIER);
+        }
+
+        @Override
+        public int getMaxAttempts() {
+            return maxAttempts;
+        }
+
+        @Override
+        public long getInitialInterval() {
+            return initialInterval;
+        }
+
+        @Override
+        public double getIntervalMultiplier() {
+            return intervalMultiplier;
+        }
+
+        @Override
+        public long getMaxInterval() {
+            return (long) (initialInterval * Math.pow(intervalMultiplier, maxAttempts));
+        }
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 5ad1655..13a0f4c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -194,13 +194,32 @@ public class KubernetesOperatorConfigOptions {
                     .stringType()
                     .defaultValue(Constants.WATCH_ALL_NAMESPACES)
                     .withDescription(
-                            "Comma separated list of namespaces the operator monitors for custom resources. Defaults to "
-                                    + Constants.WATCH_ALL_NAMESPACES);
+                            "Comma separated list of namespaces the operator monitors for custom resources.");
 
     public static final ConfigOption<Boolean> OPERATOR_DYNAMIC_NAMESPACES_ENABLED =
             ConfigOptions.key("kubernetes.operator.dynamic.namespaces.enabled")
                     .booleanType()
                     .defaultValue(false)
+                    .withDescription("Enables dynamic change of watched/monitored namespaces.");
+
+    public static final ConfigOption<Duration> OPERATOR_RETRY_INITIAL_INTERVAL =
+            ConfigOptions.key("kubernetes.operator.retry.initial.interval")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))
+                    .withDescription(
+                            "Initial interval of automatic reconcile retries on recoverable errors.");
+
+    public static final ConfigOption<Double> OPERATOR_RETRY_INTERVAL_MULTIPLIER =
+            ConfigOptions.key("kubernetes.operator.retry.interval.multiplier")
+                    .doubleType()
+                    .defaultValue(2.0)
+                    .withDescription(
+                            "Interval multiplier of automatic reconcile retries on recoverable errors.");
+
+    public static final ConfigOption<Integer> OPERATOR_RETRY_MAX_ATTEMPTS =
+            ConfigOptions.key("kubernetes.operator.retry.max.attempts")
+                    .intType()
+                    .defaultValue(10)
                     .withDescription(
-                            "Enables dynamic change of watched/monitored namespaces. Defaults to false");
+                            "Max attempts of automatic reconcile retries on recoverable errors.");
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorRetryConfigurationTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorRetryConfigurationTest.java
new file mode 100644
index 0000000..252c1c3
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorRetryConfigurationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+/** {@link FlinkOperatorConfiguration.FlinkOperatorRetryConfiguration} tests. */
+public class FlinkOperatorRetryConfigurationTest {
+
+    @Test
+    public void testRetryConfiguration() {
+
+        // default values
+        FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+        Assertions.assertEquals(
+                KubernetesOperatorConfigOptions.OPERATOR_RETRY_INITIAL_INTERVAL
+                        .defaultValue()
+                        .toMillis(),
+                configManager
+                        .getOperatorConfiguration()
+                        .getRetryConfiguration()
+                        .getInitialInterval());
+        Assertions.assertEquals(
+                KubernetesOperatorConfigOptions.OPERATOR_RETRY_INTERVAL_MULTIPLIER.defaultValue(),
+                configManager
+                        .getOperatorConfiguration()
+                        .getRetryConfiguration()
+                        .getIntervalMultiplier());
+        Assertions.assertEquals(
+                KubernetesOperatorConfigOptions.OPERATOR_RETRY_MAX_ATTEMPTS.defaultValue(),
+                configManager.getOperatorConfiguration().getRetryConfiguration().getMaxAttempts());
+
+        // overrides
+        var overrides =
+                Configuration.fromMap(
+                        Map.of(
+                                KubernetesOperatorConfigOptions.OPERATOR_RETRY_INITIAL_INTERVAL
+                                        .key(),
+                                "1 s",
+                                KubernetesOperatorConfigOptions.OPERATOR_RETRY_INTERVAL_MULTIPLIER
+                                        .key(),
+                                "2.0",
+                                KubernetesOperatorConfigOptions.OPERATOR_RETRY_MAX_ATTEMPTS.key(),
+                                "3"));
+        configManager.updateDefaultConfig(overrides);
+        Assertions.assertEquals(
+                1000L,
+                configManager
+                        .getOperatorConfiguration()
+                        .getRetryConfiguration()
+                        .getInitialInterval());
+        Assertions.assertEquals(
+                2.0,
+                configManager
+                        .getOperatorConfiguration()
+                        .getRetryConfiguration()
+                        .getIntervalMultiplier());
+        Assertions.assertEquals(
+                3,
+                configManager.getOperatorConfiguration().getRetryConfiguration().getMaxAttempts());
+        Assertions.assertEquals(
+                8000L,
+                configManager.getOperatorConfiguration().getRetryConfiguration().getMaxInterval());
+    }
+}
diff --git a/helm/flink-kubernetes-operator/conf/flink-conf.yaml b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
index 8aa5835..ccc172b 100644
--- a/helm/flink-kubernetes-operator/conf/flink-conf.yaml
+++ b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
@@ -39,6 +39,11 @@ parallelism.default: 2
 # kubernetes.operator.deployment.readiness.timeout: 1min
 # kubernetes.operator.user.artifacts.base.dir: /opt/flink/artifacts
 # kubernetes.operator.job.upgrade.ignore-pending-savepoint: false
+# kubernetes.operator.watched.namespaces: ns1,ns2
+# kubernetes.operator.dynamic.namespaces.enabled: false
+# kubernetes.operator.retry.initial.interval: 5 s
+# kubernetes.operator.retry.interval.multiplier: 2
+# kubernetes.operator.retry.max.attempts: 10
 
 # kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
 # kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE