You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/30 11:49:13 UTC

[GitHub] [flink-kubernetes-operator] gaborgsomogyi commented on a diff in pull request #438: [FLINK-29974] Not allowing the cancelling the which are already in the completed state.

gaborgsomogyi commented on code in PR #438:
URL: https://github.com/apache/flink-kubernetes-operator/pull/438#discussion_r1035860669


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -232,6 +232,218 @@ public void testStatelessUpgrade() throws Exception {
                 flinkService.listJobs());
     }
 
+    @Test

Review Comment:
   I think this can be compacted such a way but that said in my previous comment these are passing all the time no matter if your fix is there or not:
   ```
       @ParameterizedTest
       @EnumSource(org.apache.flink.api.common.JobStatus.class)
       public void testCancelStatelessSessionJob(org.apache.flink.api.common.JobStatus fromJobStatus) throws Exception {
           FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
   
           var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
   
           reconciler.reconcile(sessionJob, readyContext);
           assertEquals(1, flinkService.listJobs().size());
           verifyAndSetRunningJobsToStatus(
                   sessionJob,
                   JobState.RUNNING,
                   org.apache.flink.api.common.JobStatus.RECONCILING.name(),
                   null,
                   flinkService.listJobs());
   
           var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
           var jobConfig = flinkService.listJobs().get(0).f2;
   
           // JobID must be equal.
           assertEquals(
                   statelessSessionJob.getStatus().getJobStatus().getJobId(),
                   flinkService.listJobs().get(0).f1.getJobId().toHexString());
   
           statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
           statelessSessionJob.getSpec().getJob().setParallelism(2);
           reconciler.reconcile(statelessSessionJob, readyContext);
   
           statelessSessionJob
                   .getStatus()
                   .getJobStatus()
                   .setState(fromJobStatus.name());
           flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
           assertEquals(
                   org.apache.flink.api.common.JobStatus.FINISHED,
                   flinkService.listJobs().get(0).f1.getJobState());
           verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
       }
   ```
   



##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java:
##########
@@ -232,6 +232,218 @@ public void testStatelessUpgrade() throws Exception {
                 flinkService.listJobs());
     }
 
+    @Test
+    public void testCancelStatelessSessionJob() throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
+
+        reconciler.reconcile(sessionJob, readyContext);
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob,
+                JobState.RUNNING,
+                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                null,
+                flinkService.listJobs());
+
+        var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
+        var jobConfig = flinkService.listJobs().get(0).f2;
+
+        // JobID must be equal.
+        assertEquals(
+                statelessSessionJob.getStatus().getJobStatus().getJobId(),
+                flinkService.listJobs().get(0).f1.getJobId().toHexString());
+
+        // Case RUNNING -> FINISHED
+        statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        statelessSessionJob.getSpec().getJob().setParallelism(2);
+        flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
+        reconciler.reconcile(statelessSessionJob, readyContext);
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FAILING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.FAILING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED,
+                flinkService.listJobs().get(0).f1.getJobState());
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case CANCELLING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELLING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case RESTARTING -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
+        flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FAILED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.FAILED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case FINISHED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+
+        // Case CANCELLED -> FINISHED
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELED.name());
+        flinkService.cancelSessionJob(statelessSessionJob, UpgradeMode.STATELESS, jobConfig);
+        verifyJobState(statelessSessionJob, JobState.SUSPENDED, "FINISHED");
+    }
+
+    @Test
+    public void testCancelStatefulSessionJob() throws Exception {

Review Comment:
   * Here the same, test parts which are not expecting an exception are passing w/o the fix.
   * Please split the test per source state and don't create 10+ assertions within a single test.
   Either one can do parameterized test like I've suggested before or one can extract the common functionality into a function. An example can be found [here](https://github.com/apache/flink-kubernetes-operator/blob/80e5f3f4add50355de6964c1466ba558d46bc79d/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java#L86-L102).



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -362,25 +362,31 @@ public void cancelSessionJob(
             FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
             throws Exception {
 
-        var jobStatus = sessionJob.getStatus().getJobStatus();
+        var sessionJobStatus = sessionJob.getStatus();
+        var jobStatus = sessionJobStatus.getJobStatus();
         var jobIdString = jobStatus.getJobId();
         Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
         var jobId = JobID.fromHexString(jobIdString);
         Optional<String> savepointOpt = Optional.empty();
+
+        LOG.debug("Current Job State, {}", jobStatus.getState());
+
         try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
             final String clusterId = clusterClient.getClusterId();
             switch (upgradeMode) {
                 case STATELESS:
-                    LOG.info("Cancelling job.");
-                    clusterClient
-                            .cancel(jobId)
-                            .get(
-                                    configManager
-                                            .getOperatorConfiguration()
-                                            .getFlinkCancelJobTimeout()
-                                            .toSeconds(),
-                                    TimeUnit.SECONDS);
-                    LOG.info("Job successfully cancelled.");
+                    if (ReconciliationUtils.isJobRunning(sessionJobStatus)) {

Review Comment:
   I've just removed this if condition and the tests are still passing. Something is wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org