You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/11/10 18:20:59 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-29919] Support operator leader election
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 8c0de99f [FLINK-29919] Support operator leader election
8c0de99f is described below
commit 8c0de99fd25d2bbf99ea0742fb6e2607b8799d80
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Nov 8 13:43:38 2022 +0100
[FLINK-29919] Support operator leader election
---
docs/content/docs/operations/configuration.md | 13 ++++++++
docs/content/docs/operations/helm.md | 2 ++
.../kubernetes_operator_config_configuration.html | 30 +++++++++++++++++
.../shortcodes/generated/system_section.html | 30 +++++++++++++++++
.../flink/kubernetes/operator/FlinkOperator.java | 26 ++++++++++-----
.../config/FlinkOperatorConfiguration.java | 28 +++++++++++++++-
.../config/KubernetesOperatorConfigOptions.java | 38 ++++++++++++++++++++++
.../kubernetes/operator/FlinkOperatorTest.java | 34 +++++++++++++++++++
.../flink-kubernetes-operator/conf/flink-conf.yaml | 2 ++
.../templates/flink-operator.yaml | 4 +--
helm/flink-kubernetes-operator/templates/rbac.yaml | 6 ++++
helm/flink-kubernetes-operator/values.yaml | 7 ++++
pom.xml | 2 +-
13 files changed, 209 insertions(+), 13 deletions(-)
diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md
index 9a77b697..4a328399 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -68,6 +68,19 @@ Verify whether the config value of `kubernetes.operator.reconcile.interval` is u
2022-05-28 13:08:30,115 o.a.f.k.o.c.FlinkConfigManager [INFO ] Updating default configuration to {kubernetes.operator.reconcile.interval=PT30S}
```
+## Leader Election and High Availability
+
+The operator supports high availability through leader election and standby operator instances. To enable leader election you need to add the following two mandatory operator configuration parameters.
+
+```yaml
+kubernetes.operator.leader-election.enabled: true
+kubernetes.operator.leader-election.lease-name: flink-operator-lease
+```
+
+Lease name must be unique in the current lease namespace. For other more advanced config parameters please refer to the configuration reference.
+
+Once you enabled leader election you can increase the `replicas` for the operator Deployment using the Helm chart to enable high availability.
+
## Operator Configuration Reference
### System Configuration
diff --git a/docs/content/docs/operations/helm.md b/docs/content/docs/operations/helm.md
index 491a1bae..1aaca026 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -63,6 +63,8 @@ The configurable parameters of the Helm chart and which default values as detail
| image.repository | The image repository of flink-kubernetes-operator. | ghcr.io/apache/flink-kubernetes-operator |
| image.pullPolicy | The image pull policy of flink-kubernetes-operator. | IfNotPresent |
| image.tag | The image tag of flink-kubernetes-operator. | latest |
+| replicas | Operator replica count. Must be 1 unless leader election is configured. | 1 |
+| strategy.type | Operator pod upgrade strategy. Must be Recreate unless leader election is configured. | Recreate |
| rbac.create | Whether to enable RBAC to create for said namespaces. | true |
| rbac.nodesRule.create | Whether to add RBAC rule to list nodes which is needed for rest-service exposed as NodePort type. | false |
| operatorPod.annotations | Custom annotations to be added to the operator pod (but not the deployment). | |
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 48bc0a63..b243d659 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -146,6 +146,36 @@
<td>String</td>
<td>Label selector of the custom resources to be watched. Please see https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors for the format supported.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable leader election for the operator to allow running standby instances.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.lease-duration</h5></td>
+ <td style="word-wrap: break-word;">15 s</td>
+ <td>Duration</td>
+ <td>Leader election lease duration.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.lease-name</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Leader election lease name, must be unique for leases in the same namespace.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.renew-deadline</h5></td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Leader election renew deadline.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.retry-period</h5></td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Leader election retry period.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.observer.progress-check.interval</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git a/docs/layouts/shortcodes/generated/system_section.html b/docs/layouts/shortcodes/generated/system_section.html
index 3baf1715..153c7eee 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -50,6 +50,36 @@
<td>Duration</td>
<td>The timeout for the observer to wait the flink rest client to return.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enable leader election for the operator to allow running standby instances.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.lease-duration</h5></td>
+ <td style="word-wrap: break-word;">15 s</td>
+ <td>Duration</td>
+ <td>Leader election lease duration.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.lease-name</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Leader election lease name, must be unique for leases in the same namespace.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.renew-deadline</h5></td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Leader election renew deadline.</td>
+ </tr>
+ <tr>
+ <td><h5>kubernetes.operator.leader-election.retry-period</h5></td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Leader election retry period.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.reconcile.interval</h5></td>
<td style="word-wrap: break-word;">1 min</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 78d106f3..c6362cfb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -107,7 +107,8 @@ public class FlinkOperator {
}
private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
- int parallelism = configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
+ var operatorConf = configManager.getOperatorConfiguration();
+ int parallelism = operatorConf.getReconcilerMaxParallelism();
if (parallelism == -1) {
LOG.info("Configuring operator with unbounded reconciliation thread pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
@@ -115,7 +116,8 @@ public class FlinkOperator {
LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
}
- if (configManager.getOperatorConfiguration().isJosdkMetricsEnabled()) {
+
+ if (operatorConf.isJosdkMetricsEnabled()) {
overrider.withMetrics(new OperatorJosdkMetrics(metricGroup, configManager));
}
@@ -123,6 +125,14 @@ public class FlinkOperator {
configManager
.getDefaultConfig()
.get(KubernetesOperatorConfigOptions.OPERATOR_STOP_ON_INFORMER_ERROR));
+
+ var leaderElectionConf = operatorConf.getLeaderElectionConfiguration();
+ if (leaderElectionConf != null) {
+ overrider.withLeaderElectionConfiguration(leaderElectionConf);
+ LOG.info("Operator leader election is enabled.");
+ } else {
+ LOG.info("Operator leader election is disabled.");
+ }
}
@VisibleForTesting
@@ -172,16 +182,14 @@ public class FlinkOperator {
}
private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
- var watchNamespaces = configManager.getOperatorConfiguration().getWatchedNamespaces();
+ var operatorConf = configManager.getOperatorConfiguration();
+ var watchNamespaces = operatorConf.getWatchedNamespaces();
LOG.info("Configuring operator to watch the following namespaces: {}.", watchNamespaces);
- overrider.settingNamespaces(
- configManager.getOperatorConfiguration().getWatchedNamespaces());
+ overrider.settingNamespaces(operatorConf.getWatchedNamespaces());
- overrider.withRetry(
- GenericRetry.fromConfiguration(
- configManager.getOperatorConfiguration().getRetryConfiguration()));
+ overrider.withRetry(GenericRetry.fromConfiguration(operatorConf.getRetryConfiguration()));
- var labelSelector = configManager.getOperatorConfiguration().getLabelSelector();
+ var labelSelector = operatorConf.getLabelSelector();
LOG.info(
"Configuring operator to select custom resources with the {} labels.",
labelSelector);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index e43f4d25..b51d6a31 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -19,8 +19,10 @@
package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;
@@ -60,6 +62,7 @@ public class FlinkOperatorConfiguration {
int exceptionFieldLengthThreshold;
int exceptionThrowableCountThreshold;
String labelSelector;
+ LeaderElectionConfiguration leaderElectionConfiguration;
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
@@ -170,7 +173,30 @@ public class FlinkOperatorConfiguration {
exceptionStackTraceLengthThreshold,
exceptionFieldLengthThreshold,
exceptionThrowableCountThreshold,
- labelSelector);
+ labelSelector,
+ getLeaderElectionConfig(operatorConfig));
+ }
+
+ private static LeaderElectionConfiguration getLeaderElectionConfig(Configuration conf) {
+ if (!conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_ENABLED)) {
+ return null;
+ }
+
+ return new LeaderElectionConfiguration(
+ conf.getOptional(
+ KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_LEASE_NAME)
+ .orElseThrow(
+ () ->
+ new IllegalConfigurationException(
+ KubernetesOperatorConfigOptions
+ .OPERATOR_LEADER_ELECTION_LEASE_NAME
+ .key()
+ + " must be defined when operator leader election is enabled.")),
+ null,
+ conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_LEASE_DURATION),
+ conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RENEW_DEADLINE),
+ conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RETRY_PERIOD),
+ null);
}
/** Enables configurable retry mechanism for reconciliation errors. */
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 9053531c..508de68e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
+import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import java.time.Duration;
@@ -384,4 +385,41 @@ public class KubernetesOperatorConfigOptions {
.booleanType()
.defaultValue(false)
.withDescription("Whether to restart failed jobs.");
+
+ @Documentation.Section(SECTION_SYSTEM)
+ public static final ConfigOption<Boolean> OPERATOR_LEADER_ELECTION_ENABLED =
+ operatorConfig("leader-election.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Enable leader election for the operator to allow running standby instances.");
+
+ @Documentation.Section(SECTION_SYSTEM)
+ public static final ConfigOption<String> OPERATOR_LEADER_ELECTION_LEASE_NAME =
+ operatorConfig("leader-election.lease-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Leader election lease name, must be unique for leases in the same namespace.");
+
+ @Documentation.Section(SECTION_SYSTEM)
+ public static final ConfigOption<Duration> OPERATOR_LEADER_ELECTION_LEASE_DURATION =
+ operatorConfig("leader-election.lease-duration")
+ .durationType()
+ .defaultValue(LeaderElectionConfiguration.LEASE_DURATION_DEFAULT_VALUE)
+ .withDescription("Leader election lease duration.");
+
+ @Documentation.Section(SECTION_SYSTEM)
+ public static final ConfigOption<Duration> OPERATOR_LEADER_ELECTION_RENEW_DEADLINE =
+ operatorConfig("leader-election.renew-deadline")
+ .durationType()
+ .defaultValue(LeaderElectionConfiguration.RENEW_DEADLINE_DEFAULT_VALUE)
+ .withDescription("Leader election renew deadline.");
+
+ @Documentation.Section(SECTION_SYSTEM)
+ public static final ConfigOption<Duration> OPERATOR_LEADER_ELECTION_RETRY_PERIOD =
+ operatorConfig("leader-election.retry-period")
+ .durationType()
+ .defaultValue(LeaderElectionConfiguration.RETRY_PERIOD_DEFAULT_VALUE)
+ .withDescription("Leader election retry period.");
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
index 5b01c493..974e5f7f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
@@ -18,8 +18,10 @@
package org.apache.flink.kubernetes.operator;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import io.fabric8.kubernetes.client.Config;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
@@ -28,6 +30,8 @@ import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadPoolExecutor;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/**
* @link FlinkOperator unit tests. Since at the time of writing this the JOSDK does not support
* overriding the configuration multiple times (has a singleton @link
@@ -40,12 +44,20 @@ public class FlinkOperatorTest {
public void testConfigurationPassedToJOSDK() {
var testParallelism = 42;
var testSelector = "flink=enabled";
+ var testLeaseName = "test-lease";
+
var operatorConfig = new Configuration();
+ // We need to set this property so the operator can configure the lease namespace
+ System.setProperty(Config.KUBERNETES_NAMESPACE_SYSTEM_PROPERTY, "test_namespace");
+
operatorConfig.setInteger(
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_PARALLELISM, testParallelism);
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LABEL_SELECTOR, testSelector);
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_STOP_ON_INFORMER_ERROR, false);
+ operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_ENABLED, true);
+ operatorConfig.set(
+ KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_LEASE_NAME, testLeaseName);
var testOperator = new FlinkOperator(operatorConfig);
testOperator.registerDeploymentController();
@@ -68,6 +80,13 @@ public class FlinkOperatorTest {
Assertions.assertFalse(
ConfigurationServiceProvider.instance().stopOnInformerErrorDuringStartup());
+ var leaderElectionConfiguration =
+ ConfigurationServiceProvider.instance().getLeaderElectionConfiguration().get();
+
+ Assertions.assertEquals(testLeaseName, leaderElectionConfiguration.getLeaseName());
+ Assertions.assertFalse(leaderElectionConfiguration.getLeaseNamespace().isPresent());
+ Assertions.assertFalse(leaderElectionConfiguration.getIdentity().isPresent());
+
// TODO: Overriding operator configuration twice in JOSDK v3 yields IllegalStateException
var secondParallelism = 420;
var secondConfig = new Configuration();
@@ -77,4 +96,19 @@ public class FlinkOperatorTest {
Assertions.assertThrows(IllegalStateException.class, () -> new FlinkOperator(secondConfig));
}
+
+ @Test
+ public void testLeaderElectionConfig() {
+ var operatorConfig = new Configuration();
+ operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_ENABLED, true);
+
+ try {
+ new FlinkOperator(operatorConfig);
+ } catch (IllegalConfigurationException ice) {
+ assertTrue(
+ ice.getMessage()
+ .startsWith(
+ "kubernetes.operator.leader-election.lease-name must be defined"));
+ }
+ }
}
diff --git a/helm/flink-kubernetes-operator/conf/flink-conf.yaml b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
index 540012ac..58173b68 100644
--- a/helm/flink-kubernetes-operator/conf/flink-conf.yaml
+++ b/helm/flink-kubernetes-operator/conf/flink-conf.yaml
@@ -43,6 +43,8 @@ parallelism.default: 1
# kubernetes.operator.exception.stacktrace.max.length: 2048
# kubernetes.operator.exception.field.max.length: 2048
# kubernetes.operator.exception.throwable.list.max.count: 2
+# kubernetes.operator.leader-election.enabled: false
+# kubernetes.operator.leader-election.lease-name: flink-operator-lease
# kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
# kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index c51609c7..ad1d9ab4 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -24,9 +24,9 @@ metadata:
labels:
{{- include "flink-operator.labels" . | nindent 4 }}
spec:
- replicas: 1
+ replicas: {{ .Values.replicas }}
strategy:
- type: Recreate
+ {{- toYaml .Values.strategy | nindent 4 }}
selector:
matchLabels:
{{- include "flink-operator.selectorLabels" . | nindent 6 }}
diff --git a/helm/flink-kubernetes-operator/templates/rbac.yaml b/helm/flink-kubernetes-operator/templates/rbac.yaml
index 52b87b62..f50852e7 100644
--- a/helm/flink-kubernetes-operator/templates/rbac.yaml
+++ b/helm/flink-kubernetes-operator/templates/rbac.yaml
@@ -69,6 +69,12 @@ rules:
- ingresses
verbs:
- "*"
+ - apiGroups:
+ - coordination.k8s.io
+ resources:
+ - leases
+ verbs:
+ - "*"
{{- end }}
{{/*
diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml
index d777635d..48f42cbd 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -29,6 +29,13 @@ image:
imagePullSecrets: []
+# Replicas must be 1 unless operator leader election is configured
+replicas: 1
+
+# Strategy type must be Recreate unless leader election is configured
+strategy:
+ type: Recreate
+
rbac:
create: true
# kubernetes.rest-service.exposed.type: NodePort requires
diff --git a/pom.xml b/pom.xml
index 743975ee..328df12a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@ under the License.
<maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
<git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version>
- <operator.sdk.version>4.1.0</operator.sdk.version>
+ <operator.sdk.version>4.1.1</operator.sdk.version>
<operator.sdk.admission-controller.version>0.2.0</operator.sdk.admission-controller.version>
<fabric8.version>6.2.0</fabric8.version>