You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/17 13:13:38 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #271: [FLINK-27820] Handle deployment errors on observe

gyfora opened a new pull request, #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271

   This change allows the operator to recover from errors during and directly after Flink cluster submissions. One common cause for this would be a temporary unavailability of the kubernetes API which would prevent us from updating the status after deployment.
   
   Before this change this would lead to a fatal error where the operator would try to submit the flink cluster again and again (even though it is already deployed)
   
   To solve this case the following changes were introduced:
   
   - LastReconciledSpec now contains the CR generation for the deployed spec. During upgrades this is recorded during the first SUSPEND step of the upgrade operation
   - The CR generation is also added as an annotation to the Flink Cluster Deployment object
   - The default ReonciliationStatus for new deployments is now UPGRADING (previously it was DEPLOYED)
   - In the observer, if the reconciliation status is UPGRADING (new or upgrading deployments) we check whether the Deployment is there and if so we compare the generation annotation. If it matches to the target generation, we know it was a succesful upgrade so we upgrade the status


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#issuecomment-1160363793

   Thanks @gyfora for the PR added some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901544604


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -73,8 +71,6 @@
     private final MetricManager<FlinkDeployment> metricManager;
     private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
     private final EventRecorder eventRecorder;
-    private final ConcurrentHashMap<Tuple2<String, String>, FlinkDeploymentStatus> statusCache =

Review Comment:
   nit: we don't use the kubernetesClient here either



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901567381


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {

Review Comment:
   How about calling this method as 'syncWithDeployment` or similar. How problematic it is to introduce a new DEPLOYING state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901588885


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -229,24 +227,32 @@ private static boolean upgradeStarted(
                 || currentReconState == ReconciliationState.UPGRADING;
     }
 
-    public static <T> T deserializedSpecWithVersion(
+    public static <T> Tuple2<T, ObjectNode> deserializeSpecWithMeta(
             @Nullable String specString, Class<T> specClass) {
         if (specString == null) {
             return null;
         }
 
         try {
             ObjectNode objectNode = (ObjectNode) objectMapper.readTree(specString);
+            ObjectNode internalMeta = (ObjectNode) objectNode.remove(INTERNAL_METADATA_JSON_KEY);
+
+            // backward compatibility
             objectNode.remove("apiVersion");
-            return objectMapper.treeToValue(objectNode, specClass);
+
+            return Tuple2.of(objectMapper.treeToValue(objectNode, specClass), internalMeta);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e);
         }
     }
 
-    public static String writeSpecWithCurrentVersion(Object spec) {
+    public static String writeSpecWithMeta(

Review Comment:
   How about introducing a wrapper object here instead of modifying the objectnode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901588881


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java:
##########
@@ -73,8 +71,6 @@
     private final MetricManager<FlinkDeployment> metricManager;
     private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
     private final EventRecorder eventRecorder;
-    private final ConcurrentHashMap<Tuple2<String, String>, FlinkDeploymentStatus> statusCache =

Review Comment:
   good point, will remove that too
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901609776


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {

Review Comment:
   If I could I would simply rename UPGRADING -> DEPLOYING. But since that's not an option in this case I would prefer to simply keep it called UPGRADING and use that state also for initial first deployments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901632601


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,56 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
+        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+        var status = flinkDep.getStatus();
+        depOpt.ifPresent(
+                deployment -> {
+                    Map<String, String> annotations = deployment.getMetadata().getAnnotations();
+                    if (annotations == null) {
+                        return;
+                    }
+                    Long deployedGeneration =
+                            Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
+                                    .map(Long::valueOf)
+                                    .orElse(-1L);
+
+                    // For first deployments we get the generation from the metadata directly

Review Comment:
   nit: after moving the logic to a utility method, this comment is seemingly redundant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901637961


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,56 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
+        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+        var status = flinkDep.getStatus();
+        depOpt.ifPresent(
+                deployment -> {
+                    Map<String, String> annotations = deployment.getMetadata().getAnnotations();
+                    if (annotations == null) {
+                        return;
+                    }
+                    Long deployedGeneration =
+                            Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
+                                    .map(Long::valueOf)
+                                    .orElse(-1L);
+
+                    // For first deployments we get the generation from the metadata directly

Review Comment:
   yes, forgot this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901584493


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
+        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+        var status = flinkDep.getStatus();
+        depOpt.ifPresent(
+                deployment -> {
+                    Map<String, String> annotations = deployment.getMetadata().getAnnotations();
+                    if (annotations == null) {
+                        return;
+                    }
+                    Long deployedGeneration =
+                            Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
+                                    .map(Long::valueOf)
+                                    .orElse(-1L);
+
+                    // For first deployments we get the generation from the metadata directly
+                    // otherwise take it simply from the lastReconciledSpec
+                    Long upgradeTargetGeneration =
+                            Optional.ofNullable(
+                                            status.getReconciliationStatus()
+                                                    .deserializeLastReconciledSpecWithMeta())
+                                    .map(t -> t.f1.get("metadata").get("generation").asLong(-1L))
+                                    .orElse(flinkDep.getMetadata().getGeneration());
+
+                    if (deployedGeneration.equals(upgradeTargetGeneration)) {
+                        logger.info(
+                                "Last reconciled generation is already deployed, setting reconciliation status to "
+                                        + ReconciliationState.DEPLOYED);
+
+                        var firstDeploy =

Review Comment:
   Can we move the firstDeploy out of the `if` condition and us it when calculating the upgradeTargetGeneration?. The logic is hidden in the lambdas, here, makes hard to understand the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#issuecomment-1160436922

   > Fixed your comments @morhidi , please take a look :)
   LGTM, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901607778


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -229,24 +227,32 @@ private static boolean upgradeStarted(
                 || currentReconState == ReconciliationState.UPGRADING;
     }
 
-    public static <T> T deserializedSpecWithVersion(
+    public static <T> Tuple2<T, ObjectNode> deserializeSpecWithMeta(
             @Nullable String specString, Class<T> specClass) {
         if (specString == null) {
             return null;
         }
 
         try {
             ObjectNode objectNode = (ObjectNode) objectMapper.readTree(specString);
+            ObjectNode internalMeta = (ObjectNode) objectNode.remove(INTERNAL_METADATA_JSON_KEY);
+
+            // backward compatibility
             objectNode.remove("apiVersion");
-            return objectMapper.treeToValue(objectNode, specClass);
+
+            return Tuple2.of(objectMapper.treeToValue(objectNode, specClass), internalMeta);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Could not deserialize spec, this indicates a bug...", e);
         }
     }
 
-    public static String writeSpecWithCurrentVersion(Object spec) {
+    public static String writeSpecWithMeta(

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#issuecomment-1158859281

   cc @Aitozi @morhidi @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#issuecomment-1160408169

   Fixed your comments @morhidi , please take a look :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901587462


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {

Review Comment:
   Well this check specifically checks if an UPGRADING deployment is already upgraded. Calling it sync would give the impression that it does something more.
   
   Also it's not really problematic to introduce a new DEPLOYING state but it would add no value so I would prefer not do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901623962


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {

Review Comment:
   Definitely, DEPLOYING + checkIfAlreadyDeployed would reflect both cases better. Ok I'll leave it up to you then :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901584493


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
+        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+        var status = flinkDep.getStatus();
+        depOpt.ifPresent(
+                deployment -> {
+                    Map<String, String> annotations = deployment.getMetadata().getAnnotations();
+                    if (annotations == null) {
+                        return;
+                    }
+                    Long deployedGeneration =
+                            Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
+                                    .map(Long::valueOf)
+                                    .orElse(-1L);
+
+                    // For first deployments we get the generation from the metadata directly
+                    // otherwise take it simply from the lastReconciledSpec
+                    Long upgradeTargetGeneration =
+                            Optional.ofNullable(
+                                            status.getReconciliationStatus()
+                                                    .deserializeLastReconciledSpecWithMeta())
+                                    .map(t -> t.f1.get("metadata").get("generation").asLong(-1L))
+                                    .orElse(flinkDep.getMetadata().getGeneration());
+
+                    if (deployedGeneration.equals(upgradeTargetGeneration)) {
+                        logger.info(
+                                "Last reconciled generation is already deployed, setting reconciliation status to "
+                                        + ReconciliationState.DEPLOYED);
+
+                        var firstDeploy =

Review Comment:
   Can we move the firstDeploy out of the `if` condition and use it when calculating the upgradeTargetGeneration?. The logic is hidden in the lambdas, here, makes hard to understand the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901584493


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {
+        Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+        var status = flinkDep.getStatus();
+        depOpt.ifPresent(
+                deployment -> {
+                    Map<String, String> annotations = deployment.getMetadata().getAnnotations();
+                    if (annotations == null) {
+                        return;
+                    }
+                    Long deployedGeneration =
+                            Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
+                                    .map(Long::valueOf)
+                                    .orElse(-1L);
+
+                    // For first deployments we get the generation from the metadata directly
+                    // otherwise take it simply from the lastReconciledSpec
+                    Long upgradeTargetGeneration =
+                            Optional.ofNullable(
+                                            status.getReconciliationStatus()
+                                                    .deserializeLastReconciledSpecWithMeta())
+                                    .map(t -> t.f1.get("metadata").get("generation").asLong(-1L))
+                                    .orElse(flinkDep.getMetadata().getGeneration());
+
+                    if (deployedGeneration.equals(upgradeTargetGeneration)) {
+                        logger.info(
+                                "Last reconciled generation is already deployed, setting reconciliation status to "
+                                        + ReconciliationState.DEPLOYED);
+
+                        var firstDeploy =

Review Comment:
   Can we move the firstDeploy out of the `if` condition and use it when calculating the upgradeTargetGeneration?. The logic is hidden in the lambdas here and makes hard to understand the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #271: [FLINK-27820] Handle deployment errors on observe

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #271:
URL: https://github.com/apache/flink-kubernetes-operator/pull/271#discussion_r901600207


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java:
##########
@@ -252,6 +257,60 @@ private void onMissingDeployment(FlinkDeployment deployment) {
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it was already deployed
+     * but we simply miss the status information. After comparing the target resource generation
+     * with the one from the possible deployment if they match we update the status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context context) {

Review Comment:
   The way I understand the method catches up with initial deployments too, and the UPGRADING state covers both status. Distinguishing two states DEPLOYING and UPGRADING makes sense to me, or calling the method `checkIfAlreadyDeployedOrUpgraded`. But maybe I don't clearly understand the logic here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org