You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mo...@apache.org on 2022/12/04 13:37:04 UTC
[flink-kubernetes-operator] branch main updated (d382c74e -> 2228e58f)
This is an automated email from the ASF dual-hosted git repository.
morhidi pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
from d382c74e [FLINK-29109] Generate random jobId for stateless upgrade mode irrespective of Flink version
new ad444ecf FLINK-29536 - Add WATCH_NAMESPACE env var to operator
new 2228e58f Remove WATCH_NAMESPACE from helm chart
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../config/FlinkOperatorConfiguration.java | 31 +++++++++++----
.../flink/kubernetes/operator/utils/EnvUtils.java | 1 +
.../operator/config/FlinkConfigManagerTest.java | 46 ++++++++++++++++++++++
.../templates/flink-operator.yaml | 8 +++-
.../templates/webhook.yaml | 8 ++--
5 files changed, 80 insertions(+), 14 deletions(-)
[flink-kubernetes-operator] 02/02: Remove WATCH_NAMESPACE from helm chart
Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 2228e58ff498ccbf1aeb59edb6e3277af54c2e1f
Author: Tony Garrard <ga...@uk.ibm.com>
AuthorDate: Fri Nov 18 13:37:36 2022 +0000
Remove WATCH_NAMESPACE from helm chart
Signed-off-by: A. Garrard <GA...@uk.ibm.com>
---
helm/flink-kubernetes-operator/templates/flink-operator.yaml | 8 --------
1 file changed, 8 deletions(-)
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index b5e3d3f3..e8caa574 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -82,10 +82,6 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- - name: WATCH_NAMESPACES
- valueFrom:
- fieldRef:
- fieldPath: metadata.annotations['olm.targetNamespaces']
- name: OPERATOR_NAME
value: {{ include "flink-operator.name" . }}
- name: FLINK_CONF_DIR
@@ -164,10 +160,6 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- - name: WATCH_NAMESPACES
- valueFrom:
- fieldRef:
- fieldPath: metadata.annotations['olm.targetNamespaces']
securityContext:
{{- toYaml .Values.webhookSecurityContext | nindent 12 }}
volumeMounts:
[flink-kubernetes-operator] 01/02: FLINK-29536 - Add WATCH_NAMESPACE env var to operator
Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
morhidi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit ad444ecf75db0db5a94de7b5a5e45ee2766fce9a
Author: Tony Garrard <ga...@uk.ibm.com>
AuthorDate: Mon Oct 31 15:21:18 2022 +0000
FLINK-29536 - Add WATCH_NAMESPACE env var to operator
Add the WATCH_NAMESPACE env var to EnvUtils and modify
flink configuration to use this if defined.
Added tests in the InformerManagerTest
Modified helm charts deployment to use references to metadata.namespace
and also olm's annotations for targetNamespaces. Modified webhooks so
olm bundle generation doesn't break i.e. they need unique names and the
apiGroup should not be any.
---
.../config/FlinkOperatorConfiguration.java | 31 +++++++++++----
.../flink/kubernetes/operator/utils/EnvUtils.java | 1 +
.../operator/config/FlinkConfigManagerTest.java | 46 ++++++++++++++++++++++
.../templates/flink-operator.yaml | 16 +++++++-
.../templates/webhook.yaml | 8 ++--
5 files changed, 88 insertions(+), 14 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index b51d6a31..f65a2f00 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.operator.config;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
@@ -33,6 +34,8 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import static org.apache.flink.kubernetes.operator.utils.EnvUtils.ENV_WATCH_NAMESPACES;
+
/** Configuration class for operator. */
@Value
public class FlinkOperatorConfiguration {
@@ -122,14 +125,26 @@ public class FlinkOperatorConfiguration {
// not running in k8s, simplify local development
flinkServiceHostOverride = "localhost";
}
- var watchedNamespaces =
- new HashSet<>(
- Arrays.asList(
- operatorConfig
- .get(
- KubernetesOperatorConfigOptions
- .OPERATOR_WATCHED_NAMESPACES)
- .split(NAMESPACES_SPLITTER_KEY)));
+ Set<String> watchedNamespaces = null;
+ if (EnvUtils.get(ENV_WATCH_NAMESPACES).isEmpty()) {
+ // if the env var is not set use the config file, the default if neither set is
+ // all namespaces
+ watchedNamespaces =
+ new HashSet<>(
+ Arrays.asList(
+ operatorConfig
+ .get(
+ KubernetesOperatorConfigOptions
+ .OPERATOR_WATCHED_NAMESPACES)
+ .split(NAMESPACES_SPLITTER_KEY)));
+ } else {
+ watchedNamespaces =
+ new HashSet<>(
+ Arrays.asList(
+ EnvUtils.get(ENV_WATCH_NAMESPACES)
+ .get()
+ .split(NAMESPACES_SPLITTER_KEY)));
+ }
boolean dynamicNamespacesEnabled =
operatorConfig.get(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
index c4e169fc..39a2aaf8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EnvUtils.java
@@ -55,6 +55,7 @@ public class EnvUtils {
public static final String ENV_HOSTNAME = "HOSTNAME";
public static final String ENV_OPERATOR_NAME = "OPERATOR_NAME";
public static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE";
+ public static final String ENV_WATCH_NAMESPACES = "WATCH_NAMESPACES";
private static final String PROP_FILE = ".flink-kubernetes-operator.version.properties";
private static final String FAIL_MESSAGE =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index dd6b104b..a1c0bab5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.utils.Constants;
@@ -41,9 +42,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -194,4 +198,46 @@ public class FlinkConfigManagerTest {
FlinkConfigManager.loadGlobalConfiguration(Optional.of(confOverrideDir.toString()));
Assertions.assertEquals(Map.of("foo", "1", "bar", "2"), conf.toMap());
}
+
+ @Test
+ public void testWatchNamespaceOverride() {
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ // Set the env var to override the predefined config
+ systemEnv.put(EnvUtils.ENV_WATCH_NAMESPACES, "ns2,ns3");
+ TestUtils.setEnv(systemEnv);
+ // set config to watch different namespace
+ Configuration config =
+ Configuration.fromMap(Map.of(OPERATOR_WATCHED_NAMESPACES.key(), "ns1"));
+ FlinkConfigManager configManager = new FlinkConfigManager(config);
+ Set<String> namespaces =
+ configManager.getOperatorConfiguration().getWatchedNamespaces();
+ // expect namespaces to be those defined from env var
+ Assertions.assertArrayEquals(new String[] {"ns2", "ns3"}, namespaces.toArray());
+ } finally {
+ TestUtils.setEnv(originalEnv);
+ }
+ }
+
+ @Test
+ public void testWatchNamespaceOverrideWhenEmpty() {
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ // Set the env var to override the predefined config in this case empty
+ systemEnv.put(EnvUtils.ENV_WATCH_NAMESPACES, "");
+ TestUtils.setEnv(systemEnv);
+ // set config to watch different namespace
+ Configuration config =
+ Configuration.fromMap(Map.of(OPERATOR_WATCHED_NAMESPACES.key(), "ns1"));
+ FlinkConfigManager configManager = new FlinkConfigManager(config);
+ Set<String> namespaces =
+ configManager.getOperatorConfiguration().getWatchedNamespaces();
+ // expect namespaces to be those defined from env var
+ Assertions.assertArrayEquals(new String[] {"ns1"}, namespaces.toArray());
+ } finally {
+ TestUtils.setEnv(originalEnv);
+ }
+ }
}
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index ad1d9ab4..b5e3d3f3 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -79,7 +79,13 @@ spec:
{{- end }}
env:
- name: OPERATOR_NAMESPACE
- value: {{ .Release.Namespace }}
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: WATCH_NAMESPACES
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.annotations['olm.targetNamespaces']
- name: OPERATOR_NAME
value: {{ include "flink-operator.name" . }}
- name: FLINK_CONF_DIR
@@ -155,7 +161,13 @@ spec:
- name: FLINK_PLUGINS_DIR
value: /opt/flink/plugins
- name: OPERATOR_NAMESPACE
- value: {{ .Release.Namespace }}
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: WATCH_NAMESPACES
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.annotations['olm.targetNamespaces']
securityContext:
{{- toYaml .Values.webhookSecurityContext | nindent 12 }}
volumeMounts:
diff --git a/helm/flink-kubernetes-operator/templates/webhook.yaml b/helm/flink-kubernetes-operator/templates/webhook.yaml
index 9436b8fa..c9d12956 100644
--- a/helm/flink-kubernetes-operator/templates/webhook.yaml
+++ b/helm/flink-kubernetes-operator/templates/webhook.yaml
@@ -85,7 +85,7 @@ metadata:
cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/flink-operator-serving-cert
name: flink-operator-{{ .Release.Namespace }}-webhook-configuration
webhooks:
-- name: flinkoperator.flink.apache.org
+- name: validationwebhook.flink.apache.org
admissionReviewVersions: ["v1"]
clientConfig:
service:
@@ -94,7 +94,7 @@ webhooks:
path: /validate
failurePolicy: Fail
rules:
- - apiGroups: ["*"]
+ - apiGroups: ["flink.apache.org"]
apiVersions: ["*"]
scope: "Namespaced"
operations:
@@ -121,7 +121,7 @@ metadata:
cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/flink-operator-serving-cert
name: flink-operator-{{ .Release.Namespace }}-webhook-configuration
webhooks:
- - name: flinkoperator.flink.apache.org
+ - name: mutationwebhook.flink.apache.org
admissionReviewVersions: ["v1"]
clientConfig:
service:
@@ -130,7 +130,7 @@ webhooks:
path: /mutate
failurePolicy: Fail
rules:
- - apiGroups: ["*"]
+ - apiGroups: ["flink.apache.org"]
apiVersions: ["*"]
scope: "Namespaced"
operations: