You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/18 16:43:31 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26183] Support kubernetes-operator metrics using the Flink metric system

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new abf3a09  [FLINK-26183] Support kubernetes-operator metrics using the Flink metric system
abf3a09 is described below

commit abf3a09757a2fa187bffc439fd280c4bfd661e99
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Mon Feb 14 18:04:53 2022 +0100

    [FLINK-26183] Support kubernetes-operator metrics using the Flink metric system
    
    Closes #6
---
 Dockerfile                                         |  3 +-
 flink-kubernetes-operator/pom.xml                  | 54 +++++++++++++
 .../flink/kubernetes/operator/FlinkOperator.java   |  5 ++
 .../metrics/KubernetesOperatorMetricGroup.java     | 81 +++++++++++++++++++
 .../metrics/KubernetesOperatorMetricOptions.java   | 30 +++++++
 .../metrics/KubernetesOperatorScopeFormat.java     | 45 +++++++++++
 .../operator/metrics/OperatorMetricUtils.java      | 57 +++++++++++++
 .../kubernetes/operator/utils/FlinkUtils.java      | 13 ++-
 .../metrics/KubernetesOperatorMetricGroupTest.java | 93 ++++++++++++++++++++++
 helm/flink-operator/templates/flink-operator.yaml  | 14 ++++
 10 files changed, 390 insertions(+), 5 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 5209db5..2eda986 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -17,8 +17,8 @@
 ################################################################################
 # Build
 FROM maven:3.8.4-openjdk-11 AS build
-WORKDIR /app
 
+WORKDIR /app
 ENV OPERATOR_DIR=flink-kubernetes-operator
 ENV WEBHOOK_DIR=flink-kubernetes-webhook
 
@@ -44,6 +44,7 @@ ENV WEBHOOK_JAR=flink-kubernetes-webhook-$OPERATOR_VERSION-shaded.jar
 
 COPY --from=build /app/flink-kubernetes-operator/target/$OPERATOR_JAR /
 COPY --from=build /app/flink-kubernetes-webhook/target/$WEBHOOK_JAR /
+COPY --from=build /app/flink-kubernetes-operator/target/plugins /opt/flink/plugins
 
 COPY docker-entrypoint.sh /
 ENTRYPOINT ["/docker-entrypoint.sh"]
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index 45845d2..2e1fcc4 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -33,6 +33,7 @@ under the License.
 
     <properties>
         <awaitility.version>4.1.0</awaitility.version>
+        <plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
     </properties>
 
     <dependencies>
@@ -144,6 +145,59 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <artifactItems>
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            <artifactId>flink-metrics-datadog</artifactId>
+                            <version>${flink.version}</version>
+                            <outputDirectory>${plugins.tmp.dir}/flink-metrics-datadog</outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            <artifactId>flink-metrics-graphite</artifactId>
+                            <version>${flink.version}</version>
+                            <outputDirectory>${plugins.tmp.dir}/flink-metrics-graphite</outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            <artifactId>flink-metrics-influxdb</artifactId>
+                            <version>${flink.version}</version>
+                            <outputDirectory>${plugins.tmp.dir}/flink-metrics-influxdb</outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            <artifactId>flink-metrics-prometheus</artifactId>
+                            <version>${flink.version}</version>
+                            <outputDirectory>${plugins.tmp.dir}/flink-metrics-prometheus</outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            <artifactId>flink-metrics-slf4j</artifactId>
+                            <version>${flink.version}</version>
+                            <outputDirectory>${plugins.tmp.dir}/flink-metrics-slf4j</outputDirectory>
+                        </artifactItem>
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            <artifactId>flink-metrics-statsd</artifactId>
+                            <version>${flink.version}</version>
+                            <outputDirectory>${plugins.tmp.dir}/flink-metrics-statsd</outputDirectory>
+                        </artifactItem>
+                    </artifactItems>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 6482576..8cfdef1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -18,10 +18,12 @@
 package org.apache.flink.kubernetes.operator;
 
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
@@ -33,10 +35,13 @@ import org.slf4j.LoggerFactory;
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
     private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
+    private static final String ENV_FLINK_OPERATOR_CONF_DIR = "FLINK_OPERATOR_CONF_DIR";
 
     public static void main(String... args) {
 
         LOG.info("Starting Flink Kubernetes Operator");
+        OperatorMetricUtils.initOperatorMetrics(
+                FlinkUtils.loadConfiguration(System.getenv().get(ENV_FLINK_OPERATOR_CONF_DIR)));
 
         DefaultKubernetesClient client = new DefaultKubernetesClient();
         String namespace = client.getNamespace();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
new file mode 100644
index 0000000..5d1de59
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import java.util.Map;
+
+/** Flink based operator metric group. */
+public class KubernetesOperatorMetricGroup
+        extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
+
+    private static final String GROUP_NAME = "kubernetes-operator";
+    private String namespace;
+    private String name;
+    private String hostname;
+
+    private KubernetesOperatorMetricGroup(
+            MetricRegistry registry,
+            String[] scope,
+            String namespace,
+            String name,
+            String hostname) {
+        super(registry, scope, null);
+        this.namespace = namespace;
+        this.name = name;
+        this.hostname = hostname;
+    }
+
+    public static KubernetesOperatorMetricGroup create(
+            MetricRegistry metricRegistry,
+            Configuration configuration,
+            String namespace,
+            String name,
+            String hostname) {
+        return new KubernetesOperatorMetricGroup(
+                metricRegistry,
+                KubernetesOperatorScopeFormat.fromConfig(configuration)
+                        .formatScope(namespace, name, hostname),
+                namespace,
+                name,
+                hostname);
+    }
+
+    @Override
+    protected final void putVariables(Map<String, String> variables) {
+        variables.put(KubernetesOperatorScopeFormat.NAMESPACE, namespace);
+        variables.put(KubernetesOperatorScopeFormat.NAME, name);
+        variables.put(ScopeFormat.SCOPE_HOST, hostname);
+    }
+
+    @Override
+    protected final String getGroupName(CharacterFilter filter) {
+        return GROUP_NAME;
+    }
+
+    @Override
+    protected final QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
new file mode 100644
index 0000000..5d092f8
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Configuration options for metrics. */
+public class KubernetesOperatorMetricOptions {
+    public static final ConfigOption<String> SCOPE_NAMING_KUBERNETES_OPERATOR =
+            ConfigOptions.key("metrics.scope.kubernetes-operator")
+                    .defaultValue("<host>.kubernetes-operator.<namespace>.<name>")
+                    .withDescription(
+                            "Defines the scope format string that is applied to all metrics scoped to the kubernetes operator.");
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorScopeFormat.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorScopeFormat.java
new file mode 100644
index 0000000..81d2056
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorScopeFormat.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
+
+/** Format for metrics. * */
+public class KubernetesOperatorScopeFormat extends ScopeFormat {
+
+    public static final String NAMESPACE = asVariable("namespace");
+    public static final String NAME = asVariable("name");
+
+    public KubernetesOperatorScopeFormat(String format) {
+        super(format, null, new String[] {NAMESPACE, NAME, SCOPE_HOST});
+    }
+
+    public String[] formatScope(String namespace, String name, String hostname) {
+        final String[] template = copyTemplate();
+        final String[] values = {namespace, name, hostname};
+        return bindVariables(template, values);
+    }
+
+    public static KubernetesOperatorScopeFormat fromConfig(Configuration config) {
+        String format = config.getString(SCOPE_NAMING_KUBERNETES_OPERATOR);
+        return new KubernetesOperatorScopeFormat(format);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
new file mode 100644
index 0000000..5dfe6ba
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.ReporterSetup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+
+/** Utility class for flink based operator metrics. */
+public class OperatorMetricUtils {
+
+    private static final String ENV_HOSTNAME = "HOSTNAME";
+    private static final String ENV_OPERATOR_NAME = "OPERATOR_NAME";
+    private static final String ENV_OPERATOR_NAMESPACE = "OPERATOR_NAMESPACE";
+
+    public static void initOperatorMetrics(Configuration configuration) {
+        PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
+        MetricRegistry metricRegistry = createMetricRegistry(configuration, pluginManager);
+        KubernetesOperatorMetricGroup operatorMetricGroup =
+                KubernetesOperatorMetricGroup.create(
+                        metricRegistry,
+                        configuration,
+                        System.getenv().getOrDefault(ENV_OPERATOR_NAMESPACE, "default"),
+                        System.getenv().getOrDefault(ENV_OPERATOR_NAME, "flink-operator"),
+                        System.getenv().getOrDefault(ENV_HOSTNAME, "localhost"));
+        MetricGroup statusGroup = operatorMetricGroup.addGroup("Status");
+        MetricUtils.instantiateStatusMetrics(statusGroup);
+    }
+
+    private static MetricRegistryImpl createMetricRegistry(
+            Configuration configuration, PluginManager pluginManager) {
+        return new MetricRegistryImpl(
+                MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE),
+                ReporterSetup.fromConfiguration(configuration, pluginManager));
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index e9fbac0..dfcc2ff 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -61,10 +61,7 @@ public class FlinkUtils {
 
         try {
             String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
-            Configuration effectiveConfig =
-                    flinkConfDir != null
-                            ? GlobalConfiguration.loadConfiguration(flinkConfDir)
-                            : new Configuration();
+            Configuration effectiveConfig = loadConfiguration(flinkConfDir);
 
             effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
             effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
@@ -167,6 +164,14 @@ public class FlinkUtils {
         }
     }
 
+    public static Configuration loadConfiguration(String confDir) {
+        Configuration configuration =
+                confDir != null
+                        ? GlobalConfiguration.loadConfiguration(confDir)
+                        : new Configuration();
+        return configuration;
+    }
+
     private static String createTempFile(Pod podTemplate) throws IOException {
         File tmp = File.createTempFile("podTemplate_", ".yaml");
         Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
new file mode 100644
index 0000000..3d1d510
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroupTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.Test;
+
+import static org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricOptions.SCOPE_NAMING_KUBERNETES_OPERATOR;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** @link KubernetesOperatorMetricGroup tests. */
+public class KubernetesOperatorMetricGroupTest {
+
+    @Test
+    public void testGenerateScopeDefault() throws Exception {
+        Configuration configuration = new Configuration();
+        MetricRegistryImpl registry = new MetricRegistryImpl(fromConfiguration(configuration));
+        KubernetesOperatorMetricGroup group =
+                KubernetesOperatorMetricGroup.create(
+                        registry, configuration, "default", "flink-operator", "localhost");
+        assertArrayEquals(
+                new String[] {"localhost", "kubernetes-operator", "default", "flink-operator"},
+                group.getScopeComponents());
+        assertEquals(
+                "localhost.kubernetes-operator.default.flink-operator.test",
+                group.getMetricIdentifier("test"));
+
+        assertEquals(
+                ImmutableMap.of(
+                        "<host>",
+                        "localhost",
+                        "<namespace>",
+                        "default",
+                        "<name>",
+                        "flink-operator"),
+                group.getAllVariables());
+
+        registry.shutdown().get();
+    }
+
+    @Test
+    public void testGenerateScopeCustom() throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.setString(SCOPE_NAMING_KUBERNETES_OPERATOR, "foo.<host>.<name>.<namespace>");
+        MetricRegistryImpl registry = new MetricRegistryImpl(fromConfiguration(configuration));
+
+        KubernetesOperatorMetricGroup group =
+                KubernetesOperatorMetricGroup.create(
+                        registry, configuration, "default", "flink-operator", "localhost");
+        assertArrayEquals(
+                new String[] {"foo", "localhost", "flink-operator", "default"},
+                group.getScopeComponents());
+        assertEquals(
+                "foo.localhost.flink-operator.default.test", group.getMetricIdentifier("test"));
+
+        assertEquals(
+                ImmutableMap.of(
+                        "<host>",
+                        "localhost",
+                        "<namespace>",
+                        "default",
+                        "<name>",
+                        "flink-operator"),
+                group.getAllVariables());
+
+        registry.shutdown().get();
+    }
+
+    private static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
+        return MetricRegistryConfiguration.fromConfiguration(configuration, Long.MAX_VALUE);
+    }
+}
diff --git a/helm/flink-operator/templates/flink-operator.yaml b/helm/flink-operator/templates/flink-operator.yaml
index 0d1b414..7225dc0 100644
--- a/helm/flink-operator/templates/flink-operator.yaml
+++ b/helm/flink-operator/templates/flink-operator.yaml
@@ -44,8 +44,16 @@ spec:
           imagePullPolicy: {{ .Values.image.pullPolicy }}
           command: ["/docker-entrypoint.sh", "operator"]
           env:
+            - name: OPERATOR_NAMESPACE
+              value: {{ .Values.operatorNamespace.name }}
+            - name: OPERATOR_NAME
+              value: {{ include "flink-operator.name" . }}
             - name: FLINK_CONF_DIR
               value: /opt/flink/conf
+            - name: FLINK_PLUGINS_DIR
+              value: /opt/flink/plugins
+            - name: FLINK_OPERATOR_CONF_DIR
+              value: /opt/flink-operator/conf
             - name: LOG_CONFIG
               value: -Dlog4j.configurationFile=/opt/flink-operator/conf/log4j2.properties
           volumeMounts:
@@ -90,6 +98,8 @@ spec:
           configMap:
             name: flink-operator-config
             items:
+              - key: flink-conf.yaml
+                path: flink-conf.yaml
               - key: log4j2.properties
                 path: log4j2.properties
         - name: flink-default-config-volume
@@ -118,6 +128,10 @@ metadata:
   labels:
     {{- include "flink-operator.labels" . | nindent 4 }}
 data:
+  flink-conf.yaml: |+
+    metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
+    metrics.reporter.slf4j.interval: 1 MINUTE
+    # metrics.scope.kubernetes-operator: <host>.kubernetes-operator.<namespace>.<name>
   log4j2.properties: |+
     rootLogger.level = DEBUG
     rootLogger.appenderRef.console.ref = ConsoleAppender