You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "mbalassi (via GitHub)" <gi...@apache.org> on 2023/03/30 14:27:43 UTC

[GitHub] [flink-kubernetes-operator] mbalassi opened a new pull request, #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

mbalassi opened a new pull request, #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558

   ## Brief change log
   
     - *Exposed JobManager and TaskManager resource usage to clusterInfo in status of the CR*
     - *The clusterInfo is then used to populate the metrics for the FlinkDeployment*
   
   ## Verifying this change
   
   Manually verified via submitting applications to a local cluster. TODO to add suitable unit tests, ideally extending `FlinkDeploymentMetricsTest` however given my reliance on the status of the CR this is more involved than ideal.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mbalassi merged pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mbalassi (via GitHub)" <gi...@apache.org>.
mbalassi merged PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mbalassi commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mbalassi (via GitHub)" <gi...@apache.org>.
mbalassi commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1155274830


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -627,14 +637,42 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                                             .toSeconds(),
                                     TimeUnit.SECONDS);
 
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
                     dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
-        return runtimeVersion;
+
+        // JobManager resource usage can be deduced from the CR
+        var jmParameters =
+                new KubernetesJobManagerParameters(
+                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        var jmTotalCpu =
+                jmParameters.getJobManagerCPU()
+                        * jmParameters.getJobManagerCPULimitFactor()
+                        * jmParameters.getReplicas();
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        // TaskManager resource usage is best gathered from the REST API to get current replicas

Review Comment:
   Thanks @mateczagany, this approach looks good. If you have the bandwidth would you mind pushing your suggestions to this PR branch so that the commit can be attributed to you? 😏 I have invited you as a collaborator to my fork, you might need to accept that.
   
   I would ask the following if you have the time:
   
   1. Get resource configuration from the config as you suggested uniformly for JMs and TMs
   2. Get JM replicas from config, TM replicas from the REST API (we are trying to be careful with the TM replicas because we foresee that we might be changing things dynamically there via the autoscaler soon)
   3. Add a test to `FlinkDeploymentMetricsTest` that verifies that given that the `status.clusterInfo` is properly filled out we fill out the metrics properly.
   
   Currently we do not have meaningful test for creating the clusterInfo and since we are relying on the application's REST API I do not see an easy way of testing it properly, so I would accept this change without that (but it might merit a separate JIRA).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1156784709


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -645,10 +641,19 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                     dashboardConfiguration.getFlinkRevision());
         }
 
-        // JobManager resource usage can be deduced from the CR
-        var jmParameters =
-                new KubernetesJobManagerParameters(
-                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        clusterInfo.putAll(
+                calculateClusterResourceMetrics(
+                        conf, getTaskManagersInfo(conf).getTaskManagerInfos().size()));
+
+        return clusterInfo;
+    }
+
+    private HashMap<String, String> calculateClusterResourceMetrics(

Review Comment:
   I have added the new tests in FlinkUtils and rebased to main



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mbalassi commented on pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mbalassi (via GitHub)" <gi...@apache.org>.
mbalassi commented on PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#issuecomment-1495907324

   Thanks @mateczagany this is great. I squashed your work into 1 commit and will merge later today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mbalassi commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mbalassi (via GitHub)" <gi...@apache.org>.
mbalassi commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1155589002


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java:
##########
@@ -187,6 +193,66 @@ public void testMetricsMultiNamespace() {
         }
     }
 
+    @Test
+    public void testResourceMetrics() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var deployment1 = TestUtils.buildApplicationCluster("deployment1", namespace1);
+        var deployment2 = TestUtils.buildApplicationCluster("deployment2", namespace1);
+        var deployment3 = TestUtils.buildApplicationCluster("deployment3", namespace2);
+
+        deployment1
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5",

Review Comment:
   Could you please add a test that has unexpected values (null, empty string etc) - given that the status field could be (but not expected to be) modified externally we want to make sure that the operator logic does not fail on that (This is why I used `NumberUtils` in the implementation).



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -645,10 +641,19 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                     dashboardConfiguration.getFlinkRevision());
         }
 
-        // JobManager resource usage can be deduced from the CR
-        var jmParameters =
-                new KubernetesJobManagerParameters(
-                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        clusterInfo.putAll(
+                calculateClusterResourceMetrics(
+                        conf, getTaskManagersInfo(conf).getTaskManagerInfos().size()));
+
+        return clusterInfo;
+    }
+
+    private HashMap<String, String> calculateClusterResourceMetrics(

Review Comment:
   nit: maybe call this `calculateClusterResourceUsage` or `calculateClusterResourceFootprint`, since technically not the metrics yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mbalassi commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mbalassi (via GitHub)" <gi...@apache.org>.
mbalassi commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1154032017


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -627,14 +637,42 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                                             .toSeconds(),
                                     TimeUnit.SECONDS);
 
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
                     dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
-        return runtimeVersion;
+
+        // JobManager resource usage can be deduced from the CR
+        var jmParameters =
+                new KubernetesJobManagerParameters(
+                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        var jmTotalCpu =
+                jmParameters.getJobManagerCPU()
+                        * jmParameters.getJobManagerCPULimitFactor()
+                        * jmParameters.getReplicas();
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        // TaskManager resource usage is best gathered from the REST API to get current replicas

Review Comment:
   Good catch @mateczagany. I had this suspicion in the back of my mind, that the CPU consumption might be overreported, but the way we pass the values to the taskmanagers via `flink-kubernetes` (which does have proper fractional values) convinced me that it should be ok. I will dive a bit deeper into this and come back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1156268027


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/FlinkDeploymentMetricsTest.java:
##########
@@ -187,6 +193,66 @@ public void testMetricsMultiNamespace() {
         }
     }
 
+    @Test
+    public void testResourceMetrics() {
+        var namespace1 = "ns1";
+        var namespace2 = "ns2";
+        var deployment1 = TestUtils.buildApplicationCluster("deployment1", namespace1);
+        var deployment2 = TestUtils.buildApplicationCluster("deployment2", namespace1);
+        var deployment3 = TestUtils.buildApplicationCluster("deployment3", namespace2);
+
+        deployment1
+                .getStatus()
+                .getClusterInfo()
+                .putAll(
+                        Map.of(
+                                AbstractFlinkService.FIELD_NAME_TOTAL_CPU, "5",

Review Comment:
   I've added the tests, also added a check to convert `Infinity` and `NaN` values to 0 instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1155350156


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -627,14 +637,42 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                                             .toSeconds(),
                                     TimeUnit.SECONDS);
 
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
                     dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
-        return runtimeVersion;
+
+        // JobManager resource usage can be deduced from the CR
+        var jmParameters =
+                new KubernetesJobManagerParameters(
+                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        var jmTotalCpu =
+                jmParameters.getJobManagerCPU()
+                        * jmParameters.getJobManagerCPULimitFactor()
+                        * jmParameters.getReplicas();
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        // TaskManager resource usage is best gathered from the REST API to get current replicas

Review Comment:
   I've pushed your requests and also extracted the logic to a new method so we could test it more easily without needing REST API, I just wasn't sure where to place the test, I'm not that familiar with the project structure yet :D 
   
   If you think the PR looks ok, please let me know where you think I should write a test `AbstractFlinkService#calculateClusterResourceMetrics`, and I will do that as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1154613982


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -627,14 +637,42 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                                             .toSeconds(),
                                     TimeUnit.SECONDS);
 
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
                     dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
-        return runtimeVersion;
+
+        // JobManager resource usage can be deduced from the CR
+        var jmParameters =
+                new KubernetesJobManagerParameters(
+                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        var jmTotalCpu =
+                jmParameters.getJobManagerCPU()
+                        * jmParameters.getJobManagerCPULimitFactor()
+                        * jmParameters.getReplicas();
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        // TaskManager resource usage is best gathered from the REST API to get current replicas

Review Comment:
   I tried to implement the same logic for `tmTotalCpu` as what you did the with `jmTotalCpu`, and I think it should be valid: `tmCpuRequest * tmCpuLimitFactor * numberOfTaskManagers`
   
   `tmCpuRequest` and `tmCpuLimitFactor` are accessible the same way as for the JM. Just retrieve `kubernetes.taskmanager.cpu` and `kubernetes.taskmanager.cpu.limit-factor` from the Flink config.
   
   I'm not sure about `numberOfTaskManagers`, in my test I just downloaded the number of TMs from the Flink REST API, maybe we could just use `FlinkUtils#getNumTaskManagers` instead.
   
   Code:
   ```
   var tmTotalCpu =
           tmHardwareDesc.get().count()
                   * conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU)
                   * conf.getDouble(KubernetesConfigOptions.TASK_MANAGER_CPU_LIMIT_FACTOR);
   ```
   
   Limit factors:
   ```
   kubernetes.taskmanager.cpu.limit-factor = 1.3
   kubernetes.jobmanager.cpu.limit-factor = 1.3
   ```
   
   Result:
   ```
   Job Manager:
     Replicas:            2
     Resource:
       Cpu:          0.5
       Memory:       1g
   Task Manager:
     Replicas:            2
     Resource:
       Cpu:     0.5
       Memory:  1g
   Status:
     Cluster Info:
       Total - Cpu:                  2.6
       Total - Memory:               4294967296
   ```
   
   Do you think this could work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mbalassi commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mbalassi (via GitHub)" <gi...@apache.org>.
mbalassi commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1154290969


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -627,14 +637,42 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                                             .toSeconds(),
                                     TimeUnit.SECONDS);
 
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
                     dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
-        return runtimeVersion;
+
+        // JobManager resource usage can be deduced from the CR
+        var jmParameters =
+                new KubernetesJobManagerParameters(
+                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        var jmTotalCpu =
+                jmParameters.getJobManagerCPU()
+                        * jmParameters.getJobManagerCPULimitFactor()
+                        * jmParameters.getReplicas();
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        // TaskManager resource usage is best gathered from the REST API to get current replicas

Review Comment:
   There is a limit factor for TaskManager cores that Flink allows to be configured on top of the resources defined on the Kubernestes level, similarly to have I calculated the JobManager resources. I setup an example to validate your suggestion where I have one JM and TM each, with 0.5 cpus configured in the resources field each. The cpu limit factors are 1.0. We end up with 1.5 cpus (0.5 for the JM accurately reported and 1.0 for the TM).
   
   ```
     jobManager:
       replicas: 1
       resource:
         cpu: 0.5
         memory: 2048m
     serviceAccount: flink
     taskManager:
       resource:
         cpu: 0.5
         memory: 2048m
   status:
     clusterInfo:
       flink-revision: DeadD0d0 @ 1970-01-01T01:00:00+01:00
       flink-version: 1.16.1
       tm-cpu-limit-factor: "1.0"
       jm-cpu-limit-factor: "1.0"
       total-cpu: "1.5"
       total-memory: "4294967296"
     jobManagerDeploymentStatus: READY
   ```
   
   It is a bit of a tough problem, because the Flink UI also shows 1 core for the TM (using the same value that we get from the REST API).
   
   <img width="1403" alt="Screenshot 2023-03-31 at 12 08 26" src="https://user-images.githubusercontent.com/5990983/229091963-f5e9a985-2ebe-4518-9623-6a4d4da9ad3c.png">
   
   So ultimately we have to decide whether to stick with Flink or with Kubernetes, I am leaning towards the latter (with calculating in the limit factor, but avoiding the rounding).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1156270801


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -645,10 +641,19 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                     dashboardConfiguration.getFlinkRevision());
         }
 
-        // JobManager resource usage can be deduced from the CR
-        var jmParameters =
-                new KubernetesJobManagerParameters(
-                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        clusterInfo.putAll(
+                calculateClusterResourceMetrics(
+                        conf, getTaskManagersInfo(conf).getTaskManagerInfos().size()));
+
+        return clusterInfo;
+    }
+
+    private HashMap<String, String> calculateClusterResourceMetrics(

Review Comment:
   You're right! I've also moved the method to two separate methods in `FlinkUtils` and will add tests tomorrow if this seems okay. This will result in duplicated code, but I think it improves the code, also easier to re-use and test this way.
   
   I will add tests for the two new methods tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] mateczagany commented on a diff in pull request #558: [FLINK-31303] Expose Flink application resource usage via metrics and status

Posted by "mateczagany (via GitHub)" <gi...@apache.org>.
mateczagany commented on code in PR #558:
URL: https://github.com/apache/flink-kubernetes-operator/pull/558#discussion_r1153504166


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -627,14 +637,42 @@ public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
                                             .toSeconds(),
                                     TimeUnit.SECONDS);
 
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
                     dashboardConfiguration.getFlinkVersion());
-            runtimeVersion.put(
+            clusterInfo.put(
                     DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
                     dashboardConfiguration.getFlinkRevision());
         }
-        return runtimeVersion;
+
+        // JobManager resource usage can be deduced from the CR
+        var jmParameters =
+                new KubernetesJobManagerParameters(
+                        conf, new KubernetesClusterClientFactory().getClusterSpecification(conf));
+        var jmTotalCpu =
+                jmParameters.getJobManagerCPU()
+                        * jmParameters.getJobManagerCPULimitFactor()
+                        * jmParameters.getReplicas();
+        var jmTotalMemory =
+                Math.round(
+                        jmParameters.getJobManagerMemoryMB()
+                                * Math.pow(1024, 2)
+                                * jmParameters.getJobManagerMemoryLimitFactor()
+                                * jmParameters.getReplicas());
+
+        // TaskManager resource usage is best gathered from the REST API to get current replicas

Review Comment:
   If fractional values are used for the CPU, there will be a difference between retrieving it from Flink REST and Kubernetes CR. Flink uses `Hardware.getNumberCPUCores()` under the hood to retrieve this value, not sure exactly how that works, but it's definitely an integer in the end :D 
   
   This will lead to weird scenarios where if you have 3 JM and 3 TM replicas, all with `.5` CPU shares, the result will be `4.5` as total CPUs.
   
   An easy solution might be to just retrieve the number of TMs and multiply it with the CPU defined in the CR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org