You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/22 08:59:29 UTC
[flink-table-store] branch master updated: [FLINK-28192] Remove job status assertion in RescaleBucketITCase
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new d8e415a [FLINK-28192] Remove job status assertion in RescaleBucketITCase
d8e415a is described below
commit d8e415ae88677698903d0445cbca19dd12270a75
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Wed Jun 22 16:59:25 2022 +0800
[FLINK-28192] Remove job status assertion in RescaleBucketITCase
This closes #170
---
.../table/store/connector/RescaleBucketITCase.java | 17 +++++++----------
1 file changed, 7 insertions(+), 10 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
index ce3f167..2adfe20 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/RescaleBucketITCase.java
@@ -92,10 +92,10 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
// step1: run streaming insert
- JobID jobId = startJobAndAssertStatusTransition(client, streamSql, null);
+ JobID jobId = startJobAndCommitSnapshot(streamSql, null);
// step2: stop with savepoint
- stopJobAndAssertStatusTransition(client, jobId);
+ stopJobSafely(client, jobId);
final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3", false);
assertThat(snapshotBeforeRescale).isNotNull();
@@ -116,10 +116,9 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
assertThat(batchSql("SELECT * FROM T3")).containsExactlyInAnyOrderElementsOf(committedData);
// step5: resume streaming job
- JobID resumedJobId =
- startJobAndAssertStatusTransition(client, streamSql, snapshotAfterRescale.id());
+ JobID resumedJobId = startJobAndCommitSnapshot(streamSql, snapshotAfterRescale.id());
// stop job
- stopJobAndAssertStatusTransition(client, resumedJobId);
+ stopJobSafely(client, resumedJobId);
// check snapshot and schema
Snapshot lastSnapshot = findLatestSnapshot("T3", false);
@@ -144,22 +143,20 @@ public class RescaleBucketITCase extends FileStoreTableITCase {
}
}
- private JobID startJobAndAssertStatusTransition(
- ClusterClient<?> client, String sql, @Nullable Long initSnapshotId) throws Exception {
+ private JobID startJobAndCommitSnapshot(String sql, @Nullable Long initSnapshotId)
+ throws Exception {
JobID jobId = sEnv.executeSql(sql).getJobClient().get().getJobID();
// let job run until the first snapshot is finished
waitForTheNextSnapshot(initSnapshotId);
- assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.RUNNING);
return jobId;
}
- private void stopJobAndAssertStatusTransition(ClusterClient<?> client, JobID jobId)
+ private void stopJobSafely(ClusterClient<?> client, JobID jobId)
throws ExecutionException, InterruptedException {
client.stopWithSavepoint(jobId, true, path, SavepointFormatType.DEFAULT);
while (client.getJobStatus(jobId).get() == JobStatus.RUNNING) {
Thread.sleep(2000L);
}
- assertThat(client.getJobStatus(jobId).get()).isEqualTo(JobStatus.FINISHED);
}
private void assertLatestSchema(