You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mo...@apache.org on 2022/12/04 13:37:05 UTC
[flink-kubernetes-operator] 01/02: FLINK-29536 - Add WATCH_NAMESPACE env var to operator
This is an automated email from the ASF dual-hosted git repository.
morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit ad444ecf75db0db5a94de7b5a5e45ee2766fce9a
Author: Tony Garrard <ga...@uk.ibm.com>
AuthorDate: Mon Oct 31 15:21:18 2022 +0000
FLINK-29536 - Add WATCH_NAMESPACE env var to operator
Add the WATCH_NAMESPACE env var to EnvUtils and modify
flink configuration to use this if defined.
Added tests in the InformerManagerTest
Modified helm charts deployment to use references to metadata.namespace
and also olm's annotations for targetNamespaces. Modified webhooks so
olm bundle generation doesn't break i.e. they need unique names and the
apiGroup should not be any.
---
.../config/FlinkOperatorConfiguration.java | 31 +++++++++++----
.../flink/kubernetes/operator/utils/EnvUtils.java | 1 +
.../operator/config/FlinkConfigManagerTest.java | 46 ++++++++++++++++++++++
.../templates/flink-operator.yaml | 16 +++++++-
.../templates/webhook.yaml | 8 ++--
5 files changed, 88 insertions(+), 14 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index b51d6a31..f65a2f00 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
@@ -33,6 +34,8 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import static org.apache.flink.kubernetes.operator.utils.EnvUtils.ENV_WATCH_NAMESPACES;
+
/** Configuration class for operator. */
@Value
public class FlinkOperatorConfiguration {
@@ -122,14 +125,26 @@ public class FlinkOperatorConfiguration {
// not running in k8s, simplify local development
flinkServiceHostOverride = "localhost";
}
- var watchedNamespaces =
- new HashSet<>(
- Arrays.asList(
- operatorConfig
- .get(
- KubernetesOperatorConfigOptions
- .OPERATOR_WATCHED_NAMESPACES)
- .split(NAMESPACES_SPLITTER_KEY)));
+ Set<String> watchedNamespaces = null;
+ if (EnvUtils.get(ENV_WATCH_NAMESPACES).isEmpty()) {
+ // if the env var is not set use the config file, the default if neither set is
+ // all namespaces
+ watchedNamespaces =
+ new HashSet<>(
+ Arrays.asList(
+ operatorConfig
+ .get(
+ KubernetesOperatorConfigOptions
+ .OPERATOR_WATCHED_NAMESPACES)
+ .split(NAMESPACES_SPLITTER_KEY)));
+ } else {
+ watchedNamespaces =
+ new HashSet<>(
+ Arrays.asList(
+ EnvUtils.get(ENV_WATCH_NAMESPACES)
+ .get()
+ .split(NAMESPACES_SPLITTER_KEY)));
+ }
boolean dynamicNamespacesEnabled =
operatorConfig.get(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
index c4e169fc..39a2aaf8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
@@ -55,6 +55,7 @@ public class EnvUtils {
public static final String ENV_HOSTNAME = "HOSTNAME";
public static final String ENV_OPERATOR_NAME = "OPERATOR_NAME";
public static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE";
+ public static final String ENV_WATCH_NAMESPACES = "WATCH_NAMESPACES";
private static final String PROP_FILE = ".flink-kubernetes-operator.version.properties";
private static final String FAIL_MESSAGE =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index dd6b104b..a1c0bab5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.utils.Constants;
@@ -41,9 +42,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -194,4 +198,46 @@ public class FlinkConfigManagerTest {
FlinkConfigManager.loadGlobalConfiguration(Optional.of(confOverrideDir.toString()));
Assertions.assertEquals(Map.of("foo", "1", "bar", "2"), conf.toMap());
}
+
+ @Test
+ public void testWatchNamespaceOverride() {
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ // Set the env var to override the predefined config
+ systemEnv.put(EnvUtils.ENV_WATCH_NAMESPACES, "ns2,ns3");
+ TestUtils.setEnv(systemEnv);
+ // set config to watch different namespace
+ Configuration config =
+ Configuration.fromMap(Map.of(OPERATOR_WATCHED_NAMESPACES.key(), "ns1"));
+ FlinkConfigManager configManager = new FlinkConfigManager(config);
+ Set<String> namespaces =
+ configManager.getOperatorConfiguration().getWatchedNamespaces();
+ // expect namespaces to be those defined from env var
+ Assertions.assertArrayEquals(new String[] {"ns2", "ns3"}, namespaces.toArray());
+ } finally {
+ TestUtils.setEnv(originalEnv);
+ }
+ }
+
+ @Test
+ public void testWatchNamespaceOverrideWhenEmpty() {
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ // Set the env var to override the predefined config in this case empty
+ systemEnv.put(EnvUtils.ENV_WATCH_NAMESPACES, "");
+ TestUtils.setEnv(systemEnv);
+ // set config to watch different namespace
+ Configuration config =
+ Configuration.fromMap(Map.of(OPERATOR_WATCHED_NAMESPACES.key(), "ns1"));
+ FlinkConfigManager configManager = new FlinkConfigManager(config);
+ Set<String> namespaces =
+ configManager.getOperatorConfiguration().getWatchedNamespaces();
+ // expect namespaces to be those defined from env var
+ Assertions.assertArrayEquals(new String[] {"ns1"}, namespaces.toArray());
+ } finally {
+ TestUtils.setEnv(originalEnv);
+ }
+ }
}
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index ad1d9ab4..b5e3d3f3 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -79,7 +79,13 @@ spec:
{{- end }}
env:
- name: OPERATOR_NAMESPACE
- value: {{ .Release.Namespace }}
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: WATCH_NAMESPACES
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.annotations['olm.targetNamespaces']
- name: OPERATOR_NAME
value: {{ include "flink-operator.name" . }}
- name: FLINK_CONF_DIR
@@ -155,7 +161,13 @@ spec:
- name: FLINK_PLUGINS_DIR
value: /opt/flink/plugins
- name: OPERATOR_NAMESPACE
- value: {{ .Release.Namespace }}
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: WATCH_NAMESPACES
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.annotations['olm.targetNamespaces']
securityContext:
{{- toYaml .Values.webhookSecurityContext | nindent 12 }}
volumeMounts:
diff --git a/helm/flink-kubernetes-operator/templates/webhook.yaml b/helm/flink-kubernetes-operator/templates/webhook.yaml
index 9436b8fa..c9d12956 100644
--- a/helm/flink-kubernetes-operator/templates/webhook.yaml
+++ b/helm/flink-kubernetes-operator/templates/webhook.yaml
@@ -85,7 +85,7 @@ metadata:
cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/flink-operator-serving-cert
name: flink-operator-{{ .Release.Namespace }}-webhook-configuration
webhooks:
-- name: flinkoperator.flink.apache.org
+- name: validationwebhook.flink.apache.org
admissionReviewVersions: ["v1"]
clientConfig:
service:
@@ -94,7 +94,7 @@ webhooks:
path: /validate
failurePolicy: Fail
rules:
- - apiGroups: ["*"]
+ - apiGroups: ["flink.apache.org"]
apiVersions: ["*"]
scope: "Namespaced"
operations:
@@ -121,7 +121,7 @@ metadata:
cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/flink-operator-serving-cert
name: flink-operator-{{ .Release.Namespace }}-webhook-configuration
webhooks:
- - name: flinkoperator.flink.apache.org
+ - name: mutationwebhook.flink.apache.org
admissionReviewVersions: ["v1"]
clientConfig:
service:
@@ -130,7 +130,7 @@ webhooks:
path: /mutate
failurePolicy: Fail
rules:
- - apiGroups: ["*"]
+ - apiGroups: ["flink.apache.org"]
apiVersions: ["*"]
scope: "Namespaced"
operations: