You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/18 16:43:31 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26183] Support kubernetes-operator metrics using the Flink metric system
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new abf3a09 [FLINK-26183] Support kubernetes-operator metrics using the Flink metric system
abf3a09 is described below
commit abf3a09757a2fa187bffc439fd280c4bfd661e99
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Mon Feb 14 18:04:53 2022 +0100
[FLINK-26183] Support kubernetes-operator metrics using the Flink metric system
Closes #6
---
Dockerfile | 3 +-
flink-kubernetes-operator/pom.xml | 54 +++++++++++++
.../flink/kubernetes/operator/FlinkOperator.java | 5 ++
.../metrics/KubernetesOperatorMetricGroup.java | 81 +++++++++++++++++++
.../metrics/KubernetesOperatorMetricOptions.java | 30 +++++++
.../metrics/KubernetesOperatorScopeFormat.java | 45 +++++++++++
.../operator/metrics/OperatorMetricUtils.java | 57 +++++++++++++
.../kubernetes/operator/utils/FlinkUtils.java | 13 ++-
.../metrics/KubernetesOperatorMetricGroupTest.java | 93 ++++++++++++++++++++++
helm/flink-operator/templates/flink-operator.yaml | 14 ++++
10 files changed, 390 insertions(+), 5 deletions(-)
diff --git a/Dockerfile b/Dockerfile
index 5209db5..2eda986 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -17,8 +17,8 @@
################################################################################
# Build
FROM maven:3.8.4-openjdk-11 AS build
-WORKDIR /app
+WORKDIR /app
ENV OPERATOR_DIR=flink-kubernetes-operator
ENV WEBHOOK_DIR=flink-kubernetes-webhook
@@ -44,6 +44,7 @@ ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar
COPY --from=build /app/flink-kubernetes-operator/target/$OPERATOR_JAR /
COPY --from=build /app/flink-kubernetes-webhook/target/$WEBHOOK_JAR /
+COPY --from=build /app/flink-kubernetes-operator/target/plugins /opt/flink/plugins
COPY docker-entrypoint.sh /
ENTRYPOINT ["/docker-entrypoint.sh"]
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index 45845d2..2e1fcc4 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<awaitility.version>4.1.0</awaitility.version>
+ <plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
</properties>
<dependencies>
@@ -144,6 +145,59 @@ under the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-datadog</artifactId>
+ <version>${flink.version}</version>
+ <outputDirectory>${plugins.tmp.dir}/flink-metrics-datadog</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-graphite</artifactId>
+ <version>${flink.version}</version>
+ <outputDirectory>${plugins.tmp.dir}/flink-metrics-graphite</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-influxdb</artifactId>
+ <version>${flink.version}</version>
+ <outputDirectory>${plugins.tmp.dir}/flink-metrics-influxdb</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-prometheus</artifactId>
+ <version>${flink.version}</version>
+ <outputDirectory>${plugins.tmp.dir}/flink-metrics-prometheus</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-slf4j</artifactId>
+ <version>${flink.version}</version>
+ <outputDirectory>${plugins.tmp.dir}/flink-metrics-slf4j</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-statsd</artifactId>
+ <version>${flink.version}</version>
+ <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </plugin>
</plugins>
</build>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 6482576..8cfdef1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -18,10 +18,12 @@
package org.apache.flink.kubernetes.operator;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.javaoperatorsdk.operator.Operator;
@@ -33,10 +35,13 @@ import org.slf4j.LoggerFactory;
/** Main Class for Flink native k8s operator. */
public class FlinkOperator {
private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
+ private static final String ENV_FLINK_OPERATOR_CONF_DIR = "FLINK_OPERATOR_CONF_DIR";
public static void main(String... args) {
LOG.info("Starting Flink Kubernetes Operator");
+ OperatorMetricUtils.initOperatorMetrics(
+ FlinkUtils.loadConfiguration(System.getenv().get(ENV_FLINK_OPERATOR_CONF_DIR)));
DefaultKubernetesClient client = new DefaultKubernetesClient();
String namespace = client.getNamespace();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
new file mode 100644
index 0000000..5d1de59
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import java.util.Map;
+
+/** Flink based operator metric group. */
+public class KubernetesOperatorMetricGroup
+ extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
+
+ private static final String GROUP_NAME = "kubernetes-operator";
+ private String namespace;
+ private String name;
+ private String hostname;
+
+ private KubernetesOperatorMetricGroup(
+ MetricRegistry registry,
+ String[] scope,
+ String namespace,
+ String name,
+ String hostname) {
+ super(registry, scope, null);
+ this.namespace = namespace;
+ this.name = name;
+ this.hostname = hostname;
+ }
+
+ public static KubernetesOperatorMetricGroup create(
+ MetricRegistry metricRegistry,
+ Configuration configuration,
+ String namespace,
+ String name,
+ String hostname) {
+ return new KubernetesOperatorMetricGroup(
+ metricRegistry,
+ KubernetesOperatorScopeFormat.fromConfig(configuration)
+ .formatScope(namespace, name, hostname),
+ namespace,
+ name,
+ hostname);
+ }
+
+ @Override
+ protected final void putVariables(Map<String, String> variables) {
+ variables.put(KubernetesOperatorScopeFormat.NAMESPACE, namespace);
+ variables.put(KubernetesOperatorScopeFormat.NAME, name);
+ variables.put(ScopeFormat.SCOPE_HOST, hostname);
+ }
+
+ @Override
+ protected final String getGroupName(CharacterFilter filter) {
+ return GROUP_NAME;
+ }
+
+ @Override
+ protected final QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
new file mode 100644
index 0000000..5d092f8
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Configuration options for metrics. */
+public class KubernetesOperatorMetricOptions {
+ public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
+ ConfigOptions.key("metrics.scope.kubernetes-operator")
+ .defaultValue("<host>.kubernetes-operator.<namespace>.<name>")
+ .withDescription(
+ "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator.");
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorScopeFormat.java
new file mode 100644
index 0000000..81d2056
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorScopeFormat.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
+
+/** Format for metrics. * */
+public class KubernetesOperatorScopeFormat extends ScopeFormat {
+
+ public static final String NAMESPACE = asVariable("namespace");
+ public static final String NAME = asVariable("name");
+
+ public KubernetesOperatorScopeFormat(String format) {
+ super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST});
+ }
+
+ public String[] formatScope(String namespace, String name, String hostname) {
+ final String[] template = copyTemplate();
+ final String[] values = {namespace, name, hostname};
+ return bindVariables(template, values);
+ }
+
+ public static KubernetesOperatorScopeFormat fromConfig(Configuration config) {
+ String format = config.getString(SCOPE_NAMING_KUBERNETES_OPERATOR);
+ return new KubernetesOperatorScopeFormat(format);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
new file mode 100644
index 0000000..5dfe6ba
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.ReporterSetup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+
+/** Utility class for flink based operator metrics. */
+public class OperatorMetricUtils {
+
+ private static final String ENV_HOSTNAME = "HOSTNAME";
+ private static final String ENV_OPERATOR_NAME = "OPERATOR_NAME";
+ private static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE";
+
+ public static void initOperatorMetrics(Configuration configuration) {
+ PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
+ MetricRegistry metricRegistry = createMetricRegistry(configuration, pluginManager);
+ KubernetesOperatorMetricGroup operatorMetricGroup =
+ KubernetesOperatorMetricGroup.create(
+ metricRegistry,
+ configuration,
+ System.getenv().getOrDefault(ENV_OPERATOR_NAMESPACE, "default"),
+ System.getenv().getOrDefault(ENV_OPERATOR_NAME, "flink-operator"),
+ System.getenv().getOrDefault(ENV_HOSTNAME, "localhost"));
+ MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
+ MetricUtils.instantiateStatusMetrics(statusGroup);
+ }
+
+ private static MetricRegistryImpl createMetricRegistry(
+ Configuration configuration, PluginManager pluginManager) {
+ return new MetricRegistryImpl(
+ MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE),
+ ReporterSetup.fromConfiguration(configuration, pluginManager));
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index e9fbac0..dfcc2ff 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -61,10 +61,7 @@ public class FlinkUtils {
try {
String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
- Configuration effectiveConfig =
- flinkConfDir != null
- ? GlobalConfiguration.loadConfiguration(flinkConfDir)
- : new Configuration();
+ Configuration effectiveConfig = loadConfiguration(flinkConfDir);
effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
@@ -167,6 +164,14 @@ public class FlinkUtils {
}
}
+ public static Configuration loadConfiguration(String confDir) {
+ Configuration configuration =
+ confDir != null
+ ? GlobalConfiguration.loadConfiguration(confDir)
+ : new Configuration();
+ return configuration;
+ }
+
private static String createTempFile(Pod podTemplate) throws IOException {
File tmp = File.createTempFile("podTemplate_", ".yaml");
Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
new file mode 100644
index 0000000..3d1d510
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.Test;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** @link KubernetesOperatorMetricGroup tests. */
+public class KubernetesOperatorMetricGroupTest {
+
+ @Test
+ public void testGenerateScopeDefault() throws Exception {
+ Configuration configuration = new Configuration();
+ MetricRegistryImpl registry = new MetricRegistryImpl(fromConfiguration(configuration));
+ KubernetesOperatorMetricGroup group =
+ KubernetesOperatorMetricGroup.create(
+ registry, configuration, "default", "flink-operator", "localhost");
+ assertArrayEquals(
+ new String[] {"localhost", "kubernetes-operator", "default", "flink-operator"},
+ group.getScopeComponents());
+ assertEquals(
+ "localhost.kubernetes-operator.default.flink-operator.test",
+ group.getMetricIdentifier("test"));
+
+ assertEquals(
+ ImmutableMap.of(
+ "<host>",
+ "localhost",
+ "<namespace>",
+ "default",
+ "<name>",
+ "flink-operator"),
+ group.getAllVariables());
+
+ registry.shutdown().get();
+ }
+
+ @Test
+ public void testGenerateScopeCustom() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setString(SCOPE_NAMING_KUBERNETES_OPERATOR, "foo.<host>.<name>.<namespace>");
+ MetricRegistryImpl registry = new MetricRegistryImpl(fromConfiguration(configuration));
+
+ KubernetesOperatorMetricGroup group =
+ KubernetesOperatorMetricGroup.create(
+ registry, configuration, "default", "flink-operator", "localhost");
+ assertArrayEquals(
+ new String[] {"foo", "localhost", "flink-operator", "default"},
+ group.getScopeComponents());
+ assertEquals(
+ "foo.localhost.flink-operator.default.test", group.getMetricIdentifier("test"));
+
+ assertEquals(
+ ImmutableMap.of(
+ "<host>",
+ "localhost",
+ "<namespace>",
+ "default",
+ "<name>",
+ "flink-operator"),
+ group.getAllVariables());
+
+ registry.shutdown().get();
+ }
+
+ private static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
+ return MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE);
+ }
+}
diff --git a/helm/flink-operator/templates/flink-operator.yaml b/helm/flink-operator/templates/flink-operator.yaml
index 0d1b414..7225dc0 100644
--- a/helm/flink-operator/templates/flink-operator.yaml
+++ b/helm/flink-operator/templates/flink-operator.yaml
@@ -44,8 +44,16 @@ spec:
imagePullPolicy: {{ .Values.image.pullPolicy }}
command: ["/docker-entrypoint.sh", "operator"]
env:
+ - name: OPERATOR_NAMESPACE
+ value: {{ .Values.operatorNamespace.name }}
+ - name: OPERATOR_NAME
+ value: {{ include "flink-operator.name" . }}
- name: FLINK_CONF_DIR
value: /opt/flink/conf
+ - name: FLINK_PLUGINS_DIR
+ value: /opt/flink/plugins
+ - name: FLINK_OPERATOR_CONF_DIR
+ value: /opt/flink-operator/conf
- name: LOG_CONFIG
value: -Dlog4j.configurationFile=/opt/flink-operator/conf/log4j2.properties
volumeMounts:
@@ -90,6 +98,8 @@ spec:
configMap:
name: flink-operator-config
items:
+ - key: flink-conf.yaml
+ path: flink-conf.yaml
- key: log4j2.properties
path: log4j2.properties
- name: flink-default-config-volume
@@ -118,6 +128,10 @@ metadata:
labels:
{{- include "flink-operator.labels" . | nindent 4 }}
data:
+ flink-conf.yaml: |+
+ metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
+ metrics.reporter.slf4j.interval: 1 MINUTE
+ # metrics.scope.kubernetes-operator: <host>.kubernetes-operator.<namespace>.<name>
log4j2.properties: |+
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender