You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/09 12:12:14 UTC

[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a change in pull request #48: [FLINK-26507] Allow session and last-state jobs to reconcile in any state

wangyang0918 commented on a change in pull request #48:
URL: https://github.com/apache/flink-kubernetes-operator/pull/48#discussion_r822567083



##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -169,16 +189,24 @@ private void printCancelLogs(UpgradeMode upgradeMode, String name) {
         }
     }
 
-    private Optional<String> cancelJob(
+    private Optional<String> suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration effectiveConfig)
             throws Exception {
-        Optional<String> savepointOpt =
-                flinkService.cancelJob(
-                        JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
-                        upgradeMode,
-                        effectiveConfig);
+
+        Optional<String> savepointOpt = Optional.empty();
+        if (upgradeMode == UpgradeMode.STATELESS) {
+            shutdown(flinkApp, effectiveConfig);
+        } else {
+            String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
+            savepointOpt =
+                    flinkService.cancelJob(
+                            jobIdString != null ? JobID.fromHexString(jobIdString) : null,

Review comment:
       It is not very good that the `jobId` could be `null` here. How about the following changes?
   ```
            Optional<String> savepointOpt = Optional.empty();
   
           // The job should be running if UpgradeMode is savepoint
           if (!isJobRunning(flinkApp)) {
               FlinkUtils.deleteCluster(flinkApp, kubernetesClient, true);
               return savepointOpt;
           }
   
           final String jobIdString = flinkApp.getStatus().getJobStatus().getJobId();
           savepointOpt =
                   flinkService.cancelJob(
                           JobID.fromHexString(jobIdString), upgradeMode, effectiveConfig);
   ```

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
##########
@@ -72,19 +75,26 @@ public static FlinkDeployment buildApplicationCluster() {
                         JobSpec.builder()
                                 .jarURI(SAMPLE_JAR)
                                 .parallelism(1)
+                                .upgradeMode(UpgradeMode.STATELESS)
                                 .state(JobState.RUNNING)
                                 .build());
         return deployment;
     }
 
     public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
+        Map<String, String> conf = new HashMap<>();
+        conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+        conf.put(

Review comment:
       We could use `HighAvailabilityOptions.HA_MODE` and `KubernetesHaServicesFactory.class.getCanonicalName()` here.

##########
File path: flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
##########
@@ -76,14 +78,15 @@ public JobReconciler(
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
             ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
-            return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
-                    flinkApp, operatorConfiguration);
+            return;
+        }
+
+        if (SavepointUtils.savepointInProgress(flinkApp)) {
+            return;

Review comment:
       It will be great if we have a log here. Then users will know clearly the reconciliation is delayed because of savepoint in progress.

##########
File path: flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
##########
@@ -295,6 +302,113 @@ public void verifyStatelessUpgrade() {
         assertEquals(null, jobs.get(0).f0);
     }
 
+    @Test
+    public void testUpgradeNotReadyCluster() {
+        testUpgradeNotReadyCluster(TestUtils.buildSessionCluster(), true);
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        testUpgradeNotReadyCluster(appCluster, true);
+
+        appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        testUpgradeNotReadyCluster(appCluster, true);
+
+        appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        testUpgradeNotReadyCluster(appCluster, false);
+    }
+
+    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean allowUpgrade) {
+        mockServer

Review comment:
       I am not sure whether we could use `@EnableKubernetesMockClient(crud = true)` for this test class just like `FlinkServiceTest`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org