You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/15 11:39:10 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #440: [FLINK-30004] Cleanup deployment after savepoint for Flink versions < 1.15

gyfora commented on code in PR #440:
URL: https://github.com/apache/flink-kubernetes-operator/pull/440#discussion_r1022682931


##########
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java:
##########
@@ -143,13 +146,28 @@ public void testCancelJobWithSavepointUpgradeMode() throws Exception {
         jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
 
+        deployment.getSpec().setFlinkVersion(flinkVersion);
         flinkService.cancelJob(
                 deployment, UpgradeMode.SAVEPOINT, configManager.getObserveConfig(deployment));
         assertTrue(stopWithSavepointFuture.isDone());
         assertEquals(jobID, stopWithSavepointFuture.get().f0);
         assertFalse(stopWithSavepointFuture.get().f1);
         assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
         assertEquals(savepointPath, jobStatus.getSavepointInfo().getLastSavepoint().getLocation());
+
+        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
+            assertEquals(
+                    jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name());
+            assertEquals(
+                    deployment.getStatus().getJobManagerDeploymentStatus(),
+                    JobManagerDeploymentStatus.READY);
+        } else {
+            assertEquals(
+                    jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name());

Review Comment:
   the state assertion could be moved before the if



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -326,6 +326,7 @@ protected void cancelJob(
                                 exception);
                     }
                     if (deleteClusterAfterSavepoint) {
+                        LOG.info("Cleaning up deployment after savepoint");

Review Comment:
   Maybe we should be more specific as in `stop-with-savepoint` or `savepoing-shutdown`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org