You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/14 07:35:16 UTC

[GitHub] [flink-kubernetes-operator] SteNicholas opened a new pull request, #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

SteNicholas opened a new pull request, #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268

   The Java Operator SDK comes with an internal `Metrics` interface which could be implemented to forward metrics/measurements to the Flink metric registries. 
   
   **The brief change log**
   - Introduces `FlinkOperatorMetrics` that is the implementation of `Metrics` to monitor the operations of `Operator` and forward metrics to `MetricRegistry`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r897218837


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/FlinkOperatorMetrics.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.api.monitoring.Metrics;
+import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
+import io.javaoperatorsdk.operator.processing.event.Event;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@link Metrics} to monitor the operations of {@link Operator} and forward
+ * metrics to {@link MetricRegistry}.
+ */
+public class FlinkOperatorMetrics implements Metrics {
+
+    private static final String RECONCILIATIONS = "reconciliations.";
+    private final MetricGroup metricGroup;
+    private final Map<String, Counter> counters;
+
+    public FlinkOperatorMetrics(MetricGroup metricGroup) {
+        this.metricGroup = metricGroup;
+        this.counters = new ConcurrentHashMap<>();
+    }
+
+    public <T> T timeControllerExecution(ControllerExecution<T> execution) throws Exception {
+        String executionMetric =
+                String.format(
+                        "controller.%s.execution.%s", execution.controllerName(), execution.name());
+        counter(executionMetric);
+        try {
+            T result = execution.execute();
+            counter(executionMetric + ".success");
+            counter(executionMetric + ".success." + execution.successTypeName(result));
+            return result;
+        } catch (Exception e) {
+            counter(executionMetric + ".failure");
+            counter(executionMetric + ".failure." + e.getClass().getSimpleName());
+            throw e;
+        }
+    }
+
+    public void receivedEvent(Event event) {
+        incrementCounter(
+                event.getRelatedCustomResourceID(),
+                "events.received",
+                "event",
+                event.getClass().getSimpleName());
+    }
+
+    @Override
+    public void cleanupDoneFor(ResourceID resourceID) {
+        incrementCounter(resourceID, "events.delete");
+    }
+
+    @Override
+    public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfoNullable) {
+        Optional<RetryInfo> retryInfo = Optional.ofNullable(retryInfoNullable);
+        incrementCounter(
+                resourceID,
+                RECONCILIATIONS + "started",
+                RECONCILIATIONS + "retries.number",
+                "" + retryInfo.map(RetryInfo::getAttemptCount).orElse(0),
+                RECONCILIATIONS + "retries.last",
+                "" + retryInfo.map(RetryInfo::isLastAttempt).orElse(true));
+    }
+
+    @Override
+    public void finishedReconciliation(ResourceID resourceID) {
+        incrementCounter(resourceID, RECONCILIATIONS + "success");
+    }
+
+    public void failedReconciliation(ResourceID resourceID, Exception exception) {
+        var cause = exception.getCause();
+        if (cause == null) {
+            cause = exception;
+        } else if (cause instanceof RuntimeException) {
+            cause = cause.getCause() != null ? cause.getCause() : cause;
+        }
+        incrementCounter(
+                resourceID,
+                RECONCILIATIONS + "failed",
+                "exception",
+                cause.getClass().getSimpleName());
+    }
+
+    public <T extends Map<?, ?>> T monitorSizeOf(T map, String name) {
+        metricGroup.gauge(name + ".size", map::size);
+        return map;
+    }
+
+    private void incrementCounter(ResourceID id, String counterName, String... additionalTags) {

Review Comment:
   I think tags should be metric groups so that this integrates well with the reporters.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -71,36 +70,34 @@ public class FlinkOperator {
     private final Set<FlinkResourceValidator> validators;
     private final MetricGroup metricGroup;
 
+    private static final String OPERATOR_SDK_GROUP = "operator.sdk";
+
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
         this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
-        this.flinkService = new FlinkService(client, configManager);
-        this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.operator = new Operator(client, getConfigurationServiceOverriderConsumer());
+        this.flinkService = new FlinkService(client, configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager);
         PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
+    private Consumer<ConfigurationServiceOverrider> getConfigurationServiceOverriderConsumer() {
         return overrider -> {
-            int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
+            int parallelism =
+                    configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
             if (parallelism == -1) {
                 LOG.info("Configuring operator with unbounded reconciliation thread pool.");
                 overrider.withExecutorService(Executors.newCachedThreadPool());
             } else {
                 LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
                 overrider.withConcurrentReconciliationThreads(parallelism);
             }
+            overrider.withMetrics(
+                    new FlinkOperatorMetrics(metricGroup.addGroup(OPERATOR_SDK_GROUP)));

Review Comment:
   I think this should be configurable: `kubernetes.operator.josdk.metrics.enabled : true/false`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1177748816

   closing this in favor of https://github.com/apache/flink-kubernetes-operator/pull/302
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r896625394


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -81,59 +80,121 @@ public FlinkDeploymentController(
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
             MetricManager<FlinkDeployment> metricManager,
+            Metrics metrics,
             StatusHelper<FlinkDeploymentStatus> statusHelper) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
         this.metricManager = metricManager;
+        this.metrics = metrics;
         this.statusHelper = statusHelper;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {

Review Comment:
   @morhidi, the `Metrics` should be integrated in the `FlinkDeploymentController` and `FlinkSessionController`, like above `Controller`. Any misunderstanding?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168414964

   @gyfora, the following metrics are produced from `kubectl apply example/basic.yaml`:
   ```
   -- Counters -------------------------------------------------------------------
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.resource_basic-example.namespace_default.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.resource_basic-example.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.resource_basic-example.namespace_default.scope_namespace.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.resource_basic-example.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.resource_basic-example.namespace_default.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.resource_basic-example.namespace_default.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.controller.flinkdeploymentcontroller.execution.reconcile.success.resource.count: 1
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.resource_basic-example.namespace_default.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.resource_basic-example.namespace_default.scope_namespace.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.resource_basic-example.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.resource_basic-example.namespace_default.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.resource_basic-example.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.namespace_default.scope_namespace.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.controller.flinkdeploymentcontroller.execution.reconcile.success.count: 1
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.resource_basic-example.namespace_default.scope_namespace.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.controller.flinkdeploymentcontroller.execution.reconcile.count: 1
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.resource_basic-example.namespace_default.scope_namespace.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.namespace_default.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.resource_basic-example.namespace_default.scope_namespace.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.resource_basic-example.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.count: 4
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r896626852


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -70,37 +70,36 @@ public class FlinkOperator {
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
     private final MetricGroup metricGroup;
+    private final Metrics metrics;
+
+    private static final String OPERATOR_SDK_GROUP = "operator.sdk";
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
         this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
-        this.flinkService = new FlinkService(client, configManager);
-        this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.metrics = new FlinkOperatorMetrics(metricGroup.addGroup(OPERATOR_SDK_GROUP));
+        this.operator = new Operator(client, getConfigurationServiceOverriderConsumer());
+        this.flinkService = new FlinkService(client, configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager);
         PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
+    private Consumer<ConfigurationServiceOverrider> getConfigurationServiceOverriderConsumer() {
         return overrider -> {
-            int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
+            int parallelism =
+                    configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
             if (parallelism == -1) {
                 LOG.info("Configuring operator with unbounded reconciliation thread pool.");
                 overrider.withExecutorService(Executors.newCachedThreadPool());
             } else {
                 LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
                 overrider.withConcurrentReconciliationThreads(parallelism);
             }
+            overrider.withMetrics(metrics);

Review Comment:
   @morhidi, IMO, the `Metrics` should be integrated with overrider here to register the `Metrics` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1162800041

   I think in your implementation you still don't use the groups as intended:
   
   for each tag you can do:
   ```
   addGroup(tagName, tagValue)
   ```
   
   We should probably cache these by the tags


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r903447982


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -203,4 +203,11 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Enables dynamic change of watched/monitored namespaces. Defaults to false");
+
+    public static final ConfigOption<Boolean> OPERATOR_JOSDK_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.josdk.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enables that Metrics of Java Operator SDK forwards metrics to the Flink metric registries . Defaults to true");

Review Comment:
   @gyfora , config options like `kubernetes.operator.watched.namespaces`, `kubernetes.operator.dynamic.namespaces.enabled` include the default value in the description. Do I need to update these description of these options?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r903452192


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -203,4 +203,11 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Enables dynamic change of watched/monitored namespaces. Defaults to false");
+
+    public static final ConfigOption<Boolean> OPERATOR_JOSDK_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.josdk.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enables that Metrics of Java Operator SDK forwards metrics to the Flink metric registries . Defaults to true");

Review Comment:
   Don't worry about the others for now, Matyas will fix some of those



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1154980775

   > Hi @SteNicholas thanks for opening a PR for this feature. Could you please add a few examples how the new metrics would look? I found some confusing parts, added some comments. Could you please clarify?
   
   @morhidi, I would like to clarify above comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168493708

   First of all I sorted the metrics, to figure out what's going on :)
   
   ```
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.controller.flinkdeploymentcontroller.execution.reconcile.count: 1
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.controller.flinkdeploymentcontroller.execution.reconcile.success.count: 1
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.controller.flinkdeploymentcontroller.execution.reconcile.success.resource.count: 1
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.resource_basic-example.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.resource_basic-example.namespace_default.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_events_received.resource_basic-example.namespace_default.scope_namespace.count: 6
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.resource_basic-example.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.resource_basic-example.namespace_default.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_started.resource_basic-example.namespace_default.scope_namespace.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.resource_basic-example.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.resource_basic-example.namespace_default.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.counter_reconciliations_success.resource_basic-example.namespace_default.scope_namespace.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.resource_basic-example.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.resource_basic-example.namespace_default.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_Event.counter_events_received.resource_basic-example.namespace_default.scope_namespace.count: 4
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.namespace_default.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.namespace_default.scope_namespace.count: 2
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.resource_basic-example.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.resource_basic-example.namespace_default.count: 3
   flink-kubernetes-operator-59fb9cfd78-gh69c.k8soperator.default.flink-kubernetes-operator.operator_sdk.reconciliations_retries_number_0_reconciliations_retries_last_true.counter_reconciliations_started.resource_basic-example.namespace_default.scope_namespace.count: 3
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora closed pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora closed pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter
URL: https://github.com/apache/flink-kubernetes-operator/pull/268


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168420562

   @SteNicholas @morhidi I think we need to rethink a little how these are exposed because these metric names are simply very confusing and super redundant:
   
   few examples:
   ```
   event_Event.counter_events_received.count
   reconciliations_retries_number_0_reconciliations_retries_last_true.count
   event_ResourceEvent.counter_events_received.resource_basic-example.namespace_default.count
   ```
   
   Maybe we need to add some custom logic to make these nice and useful


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168444692

   @morhidi, do you have any recommend for the metric names?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168644559

   we should eliminate the redundant parts of these metrics and not append category inforimation using `_` in the name. If we need to add category, thats a new metric group.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1155009718

   Here is an example for the Metrics implementation: 
   https://github.com/java-operator-sdk/java-operator-sdk/blob/main/micrometer-support/src/main/java/io/javaoperatorsdk/operator/monitoring/micrometer/MicrometerMetrics.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1155010729

   We need nothing more here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r896612072


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -81,59 +80,121 @@ public FlinkDeploymentController(
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
             MetricManager<FlinkDeployment> metricManager,
+            Metrics metrics,
             StatusHelper<FlinkDeploymentStatus> statusHelper) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
         this.metricManager = metricManager;
+        this.metrics = metrics;
         this.statusHelper = statusHelper;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        LOG.info("Deleting FlinkDeployment");
-        statusHelper.updateStatusFromCache(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
-        } catch (DeploymentFailedException dfe) {
-            // ignore during cleanup
+            return metrics.timeControllerExecution(
+                    new Metrics.ControllerExecution<>() {
+                        @Override
+                        public String name() {
+                            return "cleanup";
+                        }
+
+                        @Override
+                        public String controllerName() {
+                            return FlinkDeploymentController.class.getSimpleName();
+                        }
+
+                        @Override
+                        public String successTypeName(DeleteControl deleteControl) {
+                            return deleteControl.isRemoveFinalizer()
+                                    ? "delete"
+                                    : "finalizerNotRemoved";
+                        }
+
+                        @Override
+                        public DeleteControl execute() {
+                            LOG.info("Deleting FlinkDeployment");
+                            statusHelper.updateStatusFromCache(flinkApp);
+                            try {
+                                observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+                            } catch (DeploymentFailedException dfe) {
+                                // ignore during cleanup
+                            }
+                            metricManager.onRemove(flinkApp);
+                            statusHelper.removeCachedStatus(flinkApp);
+                            return reconcilerFactory
+                                    .getOrCreate(flinkApp)
+                                    .cleanup(flinkApp, context);
+                        }
+                    });
+        } catch (Exception e) {
+            throw new OperatorException(e);
         }
-        metricManager.onRemove(flinkApp);
-        statusHelper.removeCachedStatus(flinkApp);
-        return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, context);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context)

Review Comment:
   ditto: https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java#L151



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -70,37 +70,36 @@ public class FlinkOperator {
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
     private final MetricGroup metricGroup;
+    private final Metrics metrics;
+
+    private static final String OPERATOR_SDK_GROUP = "operator.sdk";
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
         this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
-        this.flinkService = new FlinkService(client, configManager);
-        this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.metrics = new FlinkOperatorMetrics(metricGroup.addGroup(OPERATOR_SDK_GROUP));
+        this.operator = new Operator(client, getConfigurationServiceOverriderConsumer());
+        this.flinkService = new FlinkService(client, configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager);
         PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
+    private Consumer<ConfigurationServiceOverrider> getConfigurationServiceOverriderConsumer() {
         return overrider -> {
-            int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
+            int parallelism =
+                    configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
             if (parallelism == -1) {
                 LOG.info("Configuring operator with unbounded reconciliation thread pool.");
                 overrider.withExecutorService(Executors.newCachedThreadPool());
             } else {
                 LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
                 overrider.withConcurrentReconciliationThreads(parallelism);
             }
+            overrider.withMetrics(metrics);

Review Comment:
   I only expected that we would need to implement the Metrics interface and register it here, and the rest is handle by JOSDK framework.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -81,59 +80,121 @@ public FlinkDeploymentController(
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
             MetricManager<FlinkDeployment> metricManager,
+            Metrics metrics,
             StatusHelper<FlinkDeploymentStatus> statusHelper) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
         this.metricManager = metricManager;
+        this.metrics = metrics;
         this.statusHelper = statusHelper;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {

Review Comment:
   I don't really understand this part. This must be handled at JOSDK level, isn't it?
   https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java#L109



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1169736823

   Also please let us know if you feel lost in this logic, we can help with @morhidi modify your logic to fit with whats in Flink and the operator currently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168659447

   Some suggestions:
   - There's no need for the word counter_ when the metric ends with a counter.
   - Use the order namespace->flinkdeployment
   - Use `JOSDK` instead of `operator_sdk` to bring some relief to the tired eyes
   
   Use the existing metrics as an example, here we have `resourcens` to differentiate it from operatornamespace
   ```
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Mapped.MemoryUsed: 0
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.resourcens.default.FlinkDeployment.DEPLOYED_NOT_READY.Count: 0
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.CPU.Time: 207210000000
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Metaspace.Used: 58601696
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Metaspace.Max: -1
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Heap.Committed: 331350016
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.GarbageCollector.G1 Young Generation.Count: 8
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Direct.TotalCapacity: 16778334
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.resourcens.default.FlinkDeployment.READY.Count: 1
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Direct.MemoryUsed: 16778335
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Heap.Used: 67083776
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.resourcens.default.FlinkDeployment.Count: 1
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.NonHeap.Max: -1
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.resourcens.default.FlinkDeployment.DEPLOYING.Count: 0
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.resourcens.default.FlinkDeployment.ERROR.Count: 0
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Direct.Count: 5
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.GarbageCollector.G1 Old Generation.Time: 0
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.Metaspace.Committed: 60289024
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.ClassLoader.ClassesLoaded: 10073
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.NonHeap.Committed: 97460224
   flink-kubernetes-operator-64d8cc77c4-w49nj.k8soperator.default.flink-kubernetes-operator.Status.JVM.Memory.NonHeap.Used: 93806040
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168651437

   @gyfora has a good point here. We could probably use this approach:
   ```
       /**
        * Creates a new key-value MetricGroup pair. The key group is added to this groups sub-groups,
        * while the value group is added to the key group's sub-groups. This method returns the value
        * group.
        *
        * <p>The only difference between calling this method and {@code
        * group.addGroup(key).addGroup(value)} is that {@link #getAllVariables()} of the value group
        * return an additional {@code "<key>"="value"} pair.
        *
        * @param key name of the first group
        * @param value name of the second group
        * @return the second created group
        */
       MetricGroup addGroup(String key, String value);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1155073625

   > This look way better, I'll try it locally later. We could probably add the documentation for the new metrics in this PR too.
   
   @morhidi, I will add the document later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1155010266

   And how it is used:
   https://github.com/java-operator-sdk/java-operator-sdk/blob/99b5ecd92b7dafb09792a65198777f0d29405aee/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperator.java#L32


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1155031311

   > Here is an example for the Metrics implementation: https://github.com/java-operator-sdk/java-operator-sdk/blob/main/micrometer-support/src/main/java/io/javaoperatorsdk/operator/monitoring/micrometer/MicrometerMetrics.java
   
   @morhidi, I have referred to the implementation of the `MicrometerMetrics`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1170791818

   @gyfora, @morhidi, thanks for your suggestions. I will address above comments today. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1169733706

   I agree with @morhidi said. 
   
   Lets look at some concrete examples:
   ```
   1. operator_sdk.event_ResourceEvent.count: 2
   2. operator_sdk.event_ResourceEvent.counter_events_received.count: 2
   3. operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.count: 2
   4. operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.namespace_default.count: 2
   5. operator_sdk.event_ResourceEvent.counter_events_received.resource_basic-example.namespace_default.scope_namespace.count: 2
   ```
   
   These are all completely redundant and have some completely unnecessary levels. It probably should be
   ```
   1. JOSDK.ResourceEvent.received.count: 2
   2. JOSDK.ResourceEvent.received.resourcens.default.resource.basic-example.count: 2
   ```
   
   
   Another example:
   ```
   1. operator_sdk.counter_reconciliations_started.count: 3
   2. operator_sdk.counter_reconciliations_started.resource_basic-example.count: 3
   3. operator_sdk.counter_reconciliations_started.resource_basic-example.namespace_default.count: 3
   4. operator_sdk.counter_reconciliations_started.resource_basic-example.namespace_default.scope_namespace.count: 3
   ```
   should be:
   ```
   1. JOSDK.reconciliation.started.count: 3
   2. JOSDK.reconciliation.started.resourcens.default.resource.basic-example.count: 3
   ```
   
   It is very important to use the groups correctly instead of creating these random `_` connected names


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1168637878

   @morhidi, IMO, the current names of the metrics are easier for users to understand, because the part before the first `_` represent the category of the metric. If removing this part of the metric name, the metrics are confusing for end users.
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1164476160

   It would be great if you could provide a list of metrics produced (including tags) and the example output using the slf4j metric reporter for us to see.
   
   I still think there are a few strange parts of the code but I cannot spend so much time going over this again and again without a clear expectation of the output.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r903434917


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -203,4 +203,11 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Enables dynamic change of watched/monitored namespaces. Defaults to false");
+
+    public static final ConfigOption<Boolean> OPERATOR_JOSDK_METRICS_ENABLED =
+            ConfigOptions.key("kubernetes.operator.josdk.metrics.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enables that Metrics of Java Operator SDK forwards metrics to the Flink metric registries . Defaults to true");

Review Comment:
   Please do not include the default value in the description. It will be part of the HTML generated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r896628606


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -81,59 +80,121 @@ public FlinkDeploymentController(
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
             MetricManager<FlinkDeployment> metricManager,
+            Metrics metrics,
             StatusHelper<FlinkDeploymentStatus> statusHelper) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
         this.metricManager = metricManager;
+        this.metrics = metrics;
         this.statusHelper = statusHelper;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {

Review Comment:
   I mean this is done for every controller automatically by the JOSDK, there's no need to do this explicitly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r896644897


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -70,37 +70,36 @@ public class FlinkOperator {
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
     private final MetricGroup metricGroup;
+    private final Metrics metrics;
+
+    private static final String OPERATOR_SDK_GROUP = "operator.sdk";
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
         this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
-        this.flinkService = new FlinkService(client, configManager);
-        this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.metrics = new FlinkOperatorMetrics(metricGroup.addGroup(OPERATOR_SDK_GROUP));
+        this.operator = new Operator(client, getConfigurationServiceOverriderConsumer());
+        this.flinkService = new FlinkService(client, configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager);
         PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
+    private Consumer<ConfigurationServiceOverrider> getConfigurationServiceOverriderConsumer() {
         return overrider -> {
-            int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
+            int parallelism =
+                    configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
             if (parallelism == -1) {
                 LOG.info("Configuring operator with unbounded reconciliation thread pool.");
                 overrider.withExecutorService(Executors.newCachedThreadPool());
             } else {
                 LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
                 overrider.withConcurrentReconciliationThreads(parallelism);
             }
+            overrider.withMetrics(metrics);

Review Comment:
   I added the following to the code, and nothing else:
   
   ```
   overrider.withMetrics(new Metrics() {
                   @Override
                   public void receivedEvent(Event event) {
                       LOG.info("receivedEvent() called");
                   }
   
                   @Override
                   public void reconcileCustomResource(ResourceID resourceID, RetryInfo retryInfo) {
                       LOG.info("reconcileCustomResource() called");
                   }
   
                   @Override
                   public void failedReconciliation(ResourceID resourceID, Exception exception) {
                       LOG.info("failedReconciliation() called");
                   }
   
                   @Override
                   public void cleanupDoneFor(ResourceID resourceID) {
                       LOG.info("cleanupDoneFor() called");
                   }
   
                   @Override
                   public void finishedReconciliation(ResourceID resourceID) {
                       LOG.info("finishedReconciliation() called");
                   }
   
               });
   ```
   
   and I see it in the logs, it is being called:
   
   ```
   2022-06-14 12:16:07,480 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-cluster] receivedEvent() called
   2022-06-14 12:16:07,481 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,592 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example] reconcileCustomResource() called
   2022-06-14 12:16:07,593 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,605 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,607 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example2] reconcileCustomResource() called
   2022-06-14 12:16:07,608 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,622 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,662 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example] receivedEvent() called
   2022-06-14 12:16:07,686 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,778 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example2] receivedEvent() called
   2022-06-14 12:16:07,799 o.a.f.k.o.FlinkOperator        [INFO ] [default.basic-session-job-example] receivedEvent() called
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#issuecomment-1155069030

   This look way better, I'll try it locally later. We could probably add the documentation for the new metrics in this PR too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org