You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/19 12:54:44 UTC

[GitHub] [flink-kubernetes-operator] Aitozi opened a new pull request #85: [FLINK-26538] Ability to restart deployment

Aitozi opened a new pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85


   This PR is meant to add ability to restart deployment with one-shot command. 
   
   **Brief change**
   
   - Add `restartNonce` in the `FlinkDeployment#spec`
   - JobReconciler will react to the `restartNonce` change by restarting the flink deployment.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
gyfora merged pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835880421



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       nothing will happen, but why would anything happen anyways :) without the validaiton the restartNonce will behave like any other spec change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835896883



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       All I am trying to say, is that the way it is implemented, changing the `restartNonce` field works as any other spec change and it triggers a simple upgrade of the deplyoment. (this is perfectly good)
   
   Given this, I don't see the point in introducing restrictions on this field as we also dont have restrictions on when an upgrade can be triggered. I just want to keep it conistent and simple.
   
   We can of course wait for others to chime in here as well:
   @wangyang0918 @morhidi @tweise 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830598979



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -114,6 +119,14 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
         }
     }
 
+    private void restart(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+        LOG.info("Restart application cluster now.");
+        String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
+        JobID jobID = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
+        flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, effectiveConfig);

Review comment:
       Yes. IMO, the restart semantic will let the job keep running from the latest checkpoint. So the HA config is retained.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835916020



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       @gyfora @wangyang0918 Thanks for your suggestion, I have removed the validation accordingly 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1079846854


   After this change, this PR only introduce the `restartNonce` spec field, when this changed, it reuse the logic of upgrade to restart the job. cc @wangyang0918 @gyfora PTAL again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835895984



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       About the removal of the validation, I want to keep it to Monday to wait for more inputs. Not insist on it, Just want to hear more opinions on it :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #85: [WIP][FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1077462588


   > #103
   
   OK, I will take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830793222



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
##########
@@ -65,6 +65,22 @@ public static void updateForReconciliationError(FlinkDeployment flinkApp, String
         reconciliationStatus.setError(err);
     }
 
+    public static boolean triggerRestart(FlinkDeployment flinkApp) {
+        Long restartNonce = flinkApp.getSpec().getRestartNonce();
+        boolean everReconcileSucceed =

Review comment:
       oh, yes. Thanks for point out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1073007668


   cc @wangyang0918 @gyfora @tweise PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830599461



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
##########
@@ -199,6 +200,38 @@ public void triggerSavepoint() throws Exception {
         assertNull(spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
     }
 
+    @Test
+    public void triggerRestart() throws Exception {
+        Context context = TestUtils.createContextWithReadyJobManagerDeployment();
+        TestingFlinkService flinkService = new TestingFlinkService();
+
+        JobReconciler reconciler = new JobReconciler(null, flinkService, operatorConfiguration);
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
+
+        reconciler.reconcile(deployment, context, config);
+        List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+        long jobStartTime = runningJobs.get(0).f1.getStartTime();
+
+        // Test restart job
+        FlinkDeployment restartJob = ReconciliationUtils.clone(deployment);
+        restartJob.getSpec().setRestartNonce(1L);
+        reconciler.reconcile(restartJob, context, config);
+
+        runningJobs = flinkService.listJobs();
+        assertEquals(1, runningJobs.size());
+        long newJobStartTime = runningJobs.get(0).f1.getStartTime();
+        Assertions.assertTrue(newJobStartTime > jobStartTime);

Review comment:
       Good catch, removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #85: [WIP][FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1075311636


   @wangyang0918 I think the `restartNonce` should also respect to the `UpgradeMode`, So I should avoid cancel job with `LAST_STATE` mode directly. If other parts (e.g. `.image`) also changed , It will take effects after restart.
   
   Besides, we should also validate that if HA is not enabled, we will not trigger restart at `LAST_STATE` mode, what do you think ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835903502



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       I lean to make `restartNonce` working as a normal fields and do not have special validation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835915938



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -202,17 +200,19 @@ private void printCancelLogs(UpgradeMode upgradeMode) {
                 effectiveConfig);
     }
 
-    private void suspendJob(
+    private JobState suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration effectiveConfig)
             throws Exception {
         final Optional<String> savepointOpt =
                 internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
 
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        jobStatus.setState(JobState.SUSPENDED.name());

Review comment:
       Get it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830602054



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -114,6 +119,14 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
         }
     }
 
+    private void restart(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+        LOG.info("Restart application cluster now.");
+        String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
+        JobID jobID = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
+        flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, effectiveConfig);

Review comment:
       I recheck the JIRA, the restart should be a shortcut of `suspend -> delete current deployment -> restart job from latest state in a new deployment", right? If that is the case, I think your code works fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835856230



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -202,17 +200,19 @@ private void printCancelLogs(UpgradeMode upgradeMode) {
                 effectiveConfig);
     }
 
-    private void suspendJob(
+    private JobState suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration effectiveConfig)
             throws Exception {
         final Optional<String> savepointOpt =
                 internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
 
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        jobStatus.setState(JobState.SUSPENDED.name());

Review comment:
       I wonder do we need to set the `JobStatus#state` to `SUSPEND` here? One is the observed state, one is the desired state. Maybe we could just clear the state and let the next reconcile to sync the state by observer? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835908545



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -202,17 +200,19 @@ private void printCancelLogs(UpgradeMode upgradeMode) {
                 effectiveConfig);
     }
 
-    private void suspendJob(
+    private JobState suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration effectiveConfig)
             throws Exception {
         final Optional<String> savepointOpt =
                 internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
 
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        jobStatus.setState(JobState.SUSPENDED.name());

Review comment:
       I prefer to update the status as early as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830771983



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -114,6 +119,14 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
         }
     }
 
+    private void restart(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+        LOG.info("Restart application cluster now.");
+        String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
+        JobID jobID = jobIdString != null ? JobID.fromHexString(jobIdString) : null;

Review comment:
       Since we always `cancelJob` with `LAST_STATE`, do we really need to get the `jobID` here?

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
##########
@@ -65,6 +65,22 @@ public static void updateForReconciliationError(FlinkDeployment flinkApp, String
         reconciliationStatus.setError(err);
     }
 
+    public static boolean triggerRestart(FlinkDeployment flinkApp) {
+        Long restartNonce = flinkApp.getSpec().getRestartNonce();
+        boolean everReconcileSucceed =

Review comment:
       IIRC, the `triggerRestart` is never called with `lastReconciledSpec` is null since `JobReconciler#reconcile()` will directly return in such case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835883766



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       > but why would anything happen anyways
   
   I think the `restartNonce` have the semantic of bring the job to a `start/running` status somehow? If not go with this check it will work like ask job to restart once, but it still suspend :).
   
   > we could discuss whether we should allow spec changes in suspended state
   
   If JobState changes from suspended to suspended, with other spec changes, It will not take effect, I think it will not bring bad impact to allow the change. The problem of the `restartNonce` field change in suspended state is only the problem of the semantic, as I said first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835884452



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       I think the way you implemented it the restartNonce will not change the desired state. And I think it's good like this. This is just a field to trigger restart without having to change the actual spec.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #85: [WIP][FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1079690527


   I'm working on this now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835878018



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       This validation is try to guard that job should target `RUNNING` state and also in `RUNNING` state when user try to use the restartNonce. The `RUNNING` state is the desire state of the job not the observed state here. Otherwise, what will we do if user changes the `restartNonce` when the job is in `SUSPENDED` state ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835880531



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       Instead of adding this specific validation for the `restartNonce` field we could discuss whether we should allow spec changes in suspended state. I think it is fine




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830602759



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -114,6 +119,14 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
         }
     }
 
+    private void restart(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+        LOG.info("Restart application cluster now.");
+        String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
+        JobID jobID = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
+        flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, effectiveConfig);

Review comment:
       Yes, It's exactly as you said




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830584934



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -114,6 +119,14 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
         }
     }
 
+    private void restart(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+        LOG.info("Restart application cluster now.");
+        String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
+        JobID jobID = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
+        flinkService.cancelJob(jobID, UpgradeMode.LAST_STATE, effectiveConfig);

Review comment:
       IIUC, reason for using the `flinkService.cancelJob` with `LAST_STATE` mode is to delete the cluster directly?
   But in `LAST_STATE` mode, we will not delete HA config. Is that expected? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830792716



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -114,6 +119,14 @@ public void reconcile(FlinkDeployment flinkApp, Context context, Configuration e
         }
     }
 
+    private void restart(FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+        LOG.info("Restart application cluster now.");
+        String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
+        JobID jobID = jobIdString != null ? JobID.fromHexString(jobIdString) : null;

Review comment:
       not necessary, will replace it with `null`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi edited a comment on pull request #85: [WIP][FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi edited a comment on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1077462588


   > I would suggest we use mechanism that I implemented in https://github.com/apache/flink-kubernetes-operator/pull/103 to simply transition to SUSPENDED when restarting and then the next reconciliation loop will take care of the redployment
   
   OK, I will take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #85: [WIP][FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1077459160


   I would suggest we use mechanism that I implemented in https://github.com/apache/flink-kubernetes-operator/pull/103 to simply transition to SUSPENDED when restarting and then the next reconciliation loop will take care of the redployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] bgeng777 commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r830582208



##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
##########
@@ -199,6 +200,38 @@ public void triggerSavepoint() throws Exception {
         assertNull(spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
     }
 
+    @Test
+    public void triggerRestart() throws Exception {
+        Context context = TestUtils.createContextWithReadyJobManagerDeployment();
+        TestingFlinkService flinkService = new TestingFlinkService();
+
+        JobReconciler reconciler = new JobReconciler(null, flinkService, operatorConfiguration);
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        Configuration config = FlinkUtils.getEffectiveConfig(deployment, new Configuration());
+
+        reconciler.reconcile(deployment, context, config);
+        List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
+        verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+        long jobStartTime = runningJobs.get(0).f1.getStartTime();
+
+        // Test restart job
+        FlinkDeployment restartJob = ReconciliationUtils.clone(deployment);
+        restartJob.getSpec().setRestartNonce(1L);
+        reconciler.reconcile(restartJob, context, config);
+
+        runningJobs = flinkService.listJobs();
+        assertEquals(1, runningJobs.size());
+        long newJobStartTime = runningJobs.get(0).f1.getStartTime();
+        Assertions.assertTrue(newJobStartTime > jobStartTime);

Review comment:
       nit: it looks that in the file, we have already imported `org.junit.jupiter.api.Assertions.assertEquals` and `org.junit.jupiter.api.Assertions.assertTrue`. Maybe we can use these asserts without `Assertion.` prefix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835870649



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       I think we can remove these checks together with the `ReconciliationUtils.triggerRestart(deployment))` I dont think we are achieving anything and we actually restrict cases when the user want to restart a failing job




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835908862



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -202,17 +200,19 @@ private void printCancelLogs(UpgradeMode upgradeMode) {
                 effectiveConfig);
     }
 
-    private void suspendJob(
+    private JobState suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration effectiveConfig)
             throws Exception {
         final Optional<String> savepointOpt =
                 internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
 
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        jobStatus.setState(JobState.SUSPENDED.name());

Review comment:
       I prefer to update the status as early as possible since we are pretty sure the job has already been cancelled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1073535163


   @wangyang0918 Thanks for your review. I will think over your two concerns a bit more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #85: [WIP][FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#issuecomment-1075919509


   I agree that `restartNonce` should respect the `UpgradeMode` and try to reuse the current spec change behavior.
   
   The HA is not always necessary for `restartNonce`. We have already done the sanity check in webhook when using `last-state`. However, we might still have state loss in some corner scenarios. This will be covered by FLINK-26577.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-kubernetes-operator] Aitozi commented on a change in pull request #85: [FLINK-26538] Add ability to restart flink deployment

Posted by GitBox <gi...@apache.org>.
Aitozi commented on a change in pull request #85:
URL: https://github.com/apache/flink-kubernetes-operator/pull/85#discussion_r835878018



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
##########
@@ -245,6 +241,19 @@
 
         JobSpec oldJob = oldSpec.getJob();
         JobSpec newJob = newSpec.getJob();
+
+        if (lastReconciledSpec(deployment) != null
+                && ReconciliationUtils.triggerRestart(deployment)) {
+            if (newJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job to %s state", newJob.getState()));
+            }
+            if (oldJob.getState() != JobState.RUNNING) {
+                return Optional.of(
+                        String.format("Cannot restart job in %s state", oldJob.getState()));
+            }
+        }

Review comment:
       This validation is try to guard that job should target `RUNNING` state and also in `RUNNING` state when user try to use the restartNonce. The `RUNNING` state is the desire state of the job not the observed state here. Otherwise, what will we do if the job is in `SUSPENDED` state and change the `restartNonce` ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org