You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/22 08:59:29 UTC

[flink-table-store] branch master updated: [FLINK-28192] Remove job status assertion in RescaleBucketITCase

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new d8e415a  [FLINK-28192] Remove job status assertion in RescaleBucketITCase
d8e415a is described below

commit d8e415ae88677698903d0445cbca19dd12270a75
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Wed Jun 22 16:59:25 2022 +0800

    [FLINK-28192] Remove job status assertion in RescaleBucketITCase
    
    This closes #170
---
 .../table/store/connector/RescaleBucketITCase.java      | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index ce3f167..2adfe20 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -92,10 +92,10 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
         ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
 
         // step1: run streaming insert
-        JobID jobId = startJobAndAssertStatusTransition(client, streamSql, null);
+        JobID jobId = startJobAndCommitSnapshot(streamSql, null);
 
         // step2: stop with savepoint
-        stopJobAndAssertStatusTransition(client, jobId);
+        stopJobSafely(client, jobId);
 
         final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3", false);
         assertThat(snapshotBeforeRescale).isNotNull();
@@ -116,10 +116,9 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
         assertThat(batchSql("SELECT * FROM T3")).containsExactlyInAnyOrderElementsOf(committedData);
 
         // step5: resume streaming job
-        JobID resumedJobId =
-                startJobAndAssertStatusTransition(client, streamSql, snapshotAfterRescale.id());
+        JobID resumedJobId = startJobAndCommitSnapshot(streamSql, snapshotAfterRescale.id());
         // stop job
-        stopJobAndAssertStatusTransition(client, resumedJobId);
+        stopJobSafely(client, resumedJobId);
 
         // check snapshot and schema
         Snapshot lastSnapshot = findLatestSnapshot("T3", false);
@@ -144,22 +143,20 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
         }
     }
 
-    private JobID startJobAndAssertStatusTransition(
-            ClusterClient<?> client, String sql, @Nullable Long initSnapshotId) throws Exception {
+    private JobID startJobAndCommitSnapshot(String sql, @Nullable Long initSnapshotId)
+            throws Exception {
         JobID jobId = sEnv.executeSql(sql).getJobClient().get().getJobID();
         // let job run until the first snapshot is finished
         waitForTheNextSnapshot(initSnapshotId);
-        assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.RUNNING);
         return jobId;
     }
 
-    private void stopJobAndAssertStatusTransition(ClusterClient<?> client, JobID jobId)
+    private void stopJobSafely(ClusterClient<?> client, JobID jobId)
             throws ExecutionException, InterruptedException {
         client.stopWithSavepoint(jobId, true, path, SavepointFormatType.DEFAULT);
         while (client.getJobStatus(jobId).get() == JobStatus.RUNNING) {
             Thread.sleep(2000L);
         }
-        assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.FINISHED);
     }
 
     private void assertLatestSchema(