You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/14 09:53:43 UTC

[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #268: [FLINK-27914] Integrate JOSDK metrics with Flink Metrics reporter

morhidi commented on code in PR #268:
URL: https://github.com/apache/flink-kubernetes-operator/pull/268#discussion_r896612072


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -81,59 +80,121 @@ public FlinkDeploymentController(
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
             MetricManager<FlinkDeployment> metricManager,
+            Metrics metrics,
             StatusHelper<FlinkDeploymentStatus> statusHelper) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
         this.metricManager = metricManager;
+        this.metrics = metrics;
         this.statusHelper = statusHelper;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        LOG.info("Deleting FlinkDeployment");
-        statusHelper.updateStatusFromCache(flinkApp);
         try {
-            observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
-        } catch (DeploymentFailedException dfe) {
-            // ignore during cleanup
+            return metrics.timeControllerExecution(
+                    new Metrics.ControllerExecution<>() {
+                        @Override
+                        public String name() {
+                            return "cleanup";
+                        }
+
+                        @Override
+                        public String controllerName() {
+                            return FlinkDeploymentController.class.getSimpleName();
+                        }
+
+                        @Override
+                        public String successTypeName(DeleteControl deleteControl) {
+                            return deleteControl.isRemoveFinalizer()
+                                    ? "delete"
+                                    : "finalizerNotRemoved";
+                        }
+
+                        @Override
+                        public DeleteControl execute() {
+                            LOG.info("Deleting FlinkDeployment");
+                            statusHelper.updateStatusFromCache(flinkApp);
+                            try {
+                                observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
+                            } catch (DeploymentFailedException dfe) {
+                                // ignore during cleanup
+                            }
+                            metricManager.onRemove(flinkApp);
+                            statusHelper.removeCachedStatus(flinkApp);
+                            return reconcilerFactory
+                                    .getOrCreate(flinkApp)
+                                    .cleanup(flinkApp, context);
+                        }
+                    });
+        } catch (Exception e) {
+            throw new OperatorException(e);
         }
-        metricManager.onRemove(flinkApp);
-        statusHelper.removeCachedStatus(flinkApp);
-        return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, context);
     }
 
     @Override
     public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context)

Review Comment:
   ditto: https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java#L151



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java:
##########
@@ -70,37 +70,36 @@ public class FlinkOperator {
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
     private final MetricGroup metricGroup;
+    private final Metrics metrics;
+
+    private static final String OPERATOR_SDK_GROUP = "operator.sdk";
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
         this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
-        this.flinkService = new FlinkService(client, configManager);
-        this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
                 OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
+        this.metrics = new FlinkOperatorMetrics(metricGroup.addGroup(OPERATOR_SDK_GROUP));
+        this.operator = new Operator(client, getConfigurationServiceOverriderConsumer());
+        this.flinkService = new FlinkService(client, configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager);
         PluginManager pluginManager =
                 PluginUtils.createPluginManagerFromRootFolder(configManager.getDefaultConfig());
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
+    private Consumer<ConfigurationServiceOverrider> getConfigurationServiceOverriderConsumer() {
         return overrider -> {
-            int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
+            int parallelism =
+                    configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
             if (parallelism == -1) {
                 LOG.info("Configuring operator with unbounded reconciliation thread pool.");
                 overrider.withExecutorService(Executors.newCachedThreadPool());
             } else {
                 LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
                 overrider.withConcurrentReconciliationThreads(parallelism);
             }
+            overrider.withMetrics(metrics);

Review Comment:
   I only expected that we would need to implement the Metrics interface and register it here, and the rest is handle by JOSDK framework.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -81,59 +80,121 @@ public FlinkDeploymentController(
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
             MetricManager<FlinkDeployment> metricManager,
+            Metrics metrics,
             StatusHelper<FlinkDeploymentStatus> statusHelper) {
         this.configManager = configManager;
         this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
         this.metricManager = metricManager;
+        this.metrics = metrics;
         this.statusHelper = statusHelper;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {

Review Comment:
   I don't really understand this part. This must be handled at JOSDK level, isn't it?
   https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java#L109



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org