You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/19 21:43:06 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #489: [FLINK-30406] Detect when jobmanager never started

gyfora opened a new pull request, #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489

   ## What is the purpose of the change
   
   The purpose of this PR is to fix the long standing annoying case where the job was stuck after a non-upgradable state after starting/upgrading from a savepoint but the JobManager never starts.
   
   In these cases previously we only supported last-state (HA based) upgrade which was impossible to do if the JM never started and never created the HA metadata configmaps.
   
   The PR introduces a check whether the JobManager pods ever started by checking the Availability conditions on the JM deployment and comparing condition times with the deployment creation timestamp.
   
   If availability is False and the Deployment never transitioned out of this state after creation, we can then assume that the JM never started and we can perform the upgrade using the last recorded savepoint.
   
   This also removes the slightly adhoc logic we had in place for upgrades on initial deployments before stable state (that basically intended to work around this limitaiton).
   
   ## Verifying this change
   
   Unit tests + manual testing on minikube
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no
     - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053822550


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java:
##########
@@ -260,11 +265,96 @@ private FlinkDeployment cloneDeploymentWithUpgradeMode(
     }
 
     @ParameterizedTest
-    @EnumSource(UpgradeMode.class)
-    public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+    @MethodSource("testUpgradeJmDeployCannotStartParams")
+    public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toMode)
+            throws Exception {

Review Comment:
   Does it makes sense Gyula to break up this test into multiple methods? It's really hard to decode what are the internal status changes / checks mean actually.
   - succesfulJobSubmit
   - suspendWithSavepoint
   - etc..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053364035


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -116,9 +119,23 @@ public final void reconcile(CR cr, Context<?> ctx) throws Exception {
         if (reconciliationStatus.isBeforeFirstDeployment()) {
             LOG.info("Deploying for the first time");
 
+            if (spec.getJob() != null) {
+                var initialUpgradeMode = UpgradeMode.STATELESS;
+                var initialSp = spec.getJob().getInitialSavepointPath();
+
+                if (initialSp != null) {
+                    status.getJobStatus()
+                            .getSavepointInfo()
+                            .setLastSavepoint(
+                                    Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+                    initialUpgradeMode = UpgradeMode.SAVEPOINT;
+                }
+
+                spec.getJob().setUpgradeMode(initialUpgradeMode);

Review Comment:
   Also I would note that this only happens if what the user requested in the spec cannot be applied directly but we apply an allowed but different upgrade mode instead . It would be strange if this was not reflected in the status and would make it difficult to build some feature that require knowing what actually was done during last upgrade 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1058387166


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -112,18 +116,23 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode(
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
 
-            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-                if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
-                    // initial deployment failure, reset to allow for spec change to proceed
-                    return resetOnMissingStableSpec(deployment, deployConfig);
-                }
-            } else {
+            if (flinkService.isHaMetadataAvailable(deployConfig)) {
                 LOG.info(
                         "Job is not running but HA metadata is available for last state restore, ready for upgrade");
                 return Optional.of(UpgradeMode.LAST_STATE);
             }
         }
 
+        if (status.getReconciliationStatus()
+                                .deserializeLastReconciledSpec()
+                                .getJob()
+                                .getUpgradeMode()
+                        != UpgradeMode.LAST_STATE
+                && FlinkUtils.jmPodNeverStarted(ctx)) {
+            deleteJmThatNeverStarted(deployment, deployConfig);
+            return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);

Review Comment:
   I imporved the check to make this more clear, but essentially after we delete the JobManager deployment, the job will be put in a terminal state and the JmDeployment in the MISSING state. Therefore this branch would not be hit again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053332127


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -116,9 +119,23 @@ public final void reconcile(CR cr, Context<?> ctx) throws Exception {
         if (reconciliationStatus.isBeforeFirstDeployment()) {
             LOG.info("Deploying for the first time");
 
+            if (spec.getJob() != null) {
+                var initialUpgradeMode = UpgradeMode.STATELESS;
+                var initialSp = spec.getJob().getInitialSavepointPath();
+
+                if (initialSp != null) {
+                    status.getJobStatus()
+                            .getSavepointInfo()
+                            .setLastSavepoint(
+                                    Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+                    initialUpgradeMode = UpgradeMode.SAVEPOINT;
+                }
+
+                spec.getJob().setUpgradeMode(initialUpgradeMode);

Review Comment:
   @gyula-fora this wasn't resolved. I would expect the "reconciled" (and stable) spec to match the spec that was supplied. AFAIK, comparing desired spec with reconciled/stable spec is currently the only way to determine if all changes have been applied. If the specs don't match, how can that be achieved?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053360675


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -116,9 +119,23 @@ public final void reconcile(CR cr, Context<?> ctx) throws Exception {
         if (reconciliationStatus.isBeforeFirstDeployment()) {
             LOG.info("Deploying for the first time");
 
+            if (spec.getJob() != null) {
+                var initialUpgradeMode = UpgradeMode.STATELESS;
+                var initialSp = spec.getJob().getInitialSavepointPath();
+
+                if (initialSp != null) {
+                    status.getJobStatus()
+                            .getSavepointInfo()
+                            .setLastSavepoint(
+                                    Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+                    initialUpgradeMode = UpgradeMode.SAVEPOINT;
+                }
+
+                spec.getJob().setUpgradeMode(initialUpgradeMode);

Review Comment:
   We already store modified versions of the spec as the operator reconciles the resource. We had similar logic already during suspend/ upgrade/savepoints etc.
   
   this is not a problem because we do not use a naive equality check to test whether user requested changes have been applied. But instead we determine the diff and ignore changes that do not result in an upgrade but only affect reconciliation (such as upgrade mode and operator configs)
   
   Since this is already done and works without any issues and is actually a very clean solution I would suggest we not change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#issuecomment-1366390439

   +1 nice job @gyfora !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1058069736


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java:
##########
@@ -112,18 +116,23 @@ protected Optional<UpgradeMode> getAvailableUpgradeMode(
                 && !flinkVersionChanged(
                         ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
 
-            if (!flinkService.isHaMetadataAvailable(deployConfig)) {
-                if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
-                    // initial deployment failure, reset to allow for spec change to proceed
-                    return resetOnMissingStableSpec(deployment, deployConfig);
-                }
-            } else {
+            if (flinkService.isHaMetadataAvailable(deployConfig)) {
                 LOG.info(
                         "Job is not running but HA metadata is available for last state restore, ready for upgrade");
                 return Optional.of(UpgradeMode.LAST_STATE);
             }
         }
 
+        if (status.getReconciliationStatus()
+                                .deserializeLastReconciledSpec()
+                                .getJob()
+                                .getUpgradeMode()
+                        != UpgradeMode.LAST_STATE
+                && FlinkUtils.jmPodNeverStarted(ctx)) {
+            deleteJmThatNeverStarted(deployment, deployConfig);
+            return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);

Review Comment:
   Can the process stuck here in a recursive loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053989089


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java:
##########
@@ -260,11 +265,96 @@ private FlinkDeployment cloneDeploymentWithUpgradeMode(
     }
 
     @ParameterizedTest
-    @EnumSource(UpgradeMode.class)
-    public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+    @MethodSource("testUpgradeJmDeployCannotStartParams")
+    public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toMode)
+            throws Exception {

Review Comment:
   This test is indeed a bit too complex , I will try to make some extra methods comments to make it clearer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] tweise commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1053912124


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java:
##########
@@ -116,9 +119,23 @@ public final void reconcile(CR cr, Context<?> ctx) throws Exception {
         if (reconciliationStatus.isBeforeFirstDeployment()) {
             LOG.info("Deploying for the first time");
 
+            if (spec.getJob() != null) {
+                var initialUpgradeMode = UpgradeMode.STATELESS;
+                var initialSp = spec.getJob().getInitialSavepointPath();
+
+                if (initialSp != null) {
+                    status.getJobStatus()
+                            .getSavepointInfo()
+                            .setLastSavepoint(
+                                    Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+                    initialUpgradeMode = UpgradeMode.SAVEPOINT;
+                }
+
+                spec.getJob().setUpgradeMode(initialUpgradeMode);

Review Comment:
   Makes sense. Agreed that it must be possible to find the actually used upgrade mode. And since upgrade mode change does not cause reconciliation there is no need to consider it when comparing the specs from user side either. That a new mode was applied is evident by its presence in the latest spec.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #489: [FLINK-30406] Detect when jobmanager never started

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #489:
URL: https://github.com/apache/flink-kubernetes-operator/pull/489#discussion_r1055881354


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java:
##########
@@ -260,11 +265,96 @@ private FlinkDeployment cloneDeploymentWithUpgradeMode(
     }
 
     @ParameterizedTest
-    @EnumSource(UpgradeMode.class)
-    public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+    @MethodSource("testUpgradeJmDeployCannotStartParams")
+    public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toMode)
+            throws Exception {

Review Comment:
   I have added some more comments to the tests @morhidi . I could not find a good way to break it up to methods, I feel that would just add more complexity.
   
   These tests are pretty generic in the sense that they test different upgradeMode/savepoint setting combinations. While that helps us to cover all cases in a robust way it makes it a little difficult to simplify beyond a point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org