You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/10 07:34:22 UTC
[flink-table-store] branch master updated: [FLINK-30603] Fix unstable test CompactActionITCase#testStreamingCompact
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new b7188bcc [FLINK-30603] Fix unstable test CompactActionITCase#testStreamingCompact
b7188bcc is described below
commit b7188bcc46989c66e44f1fb04cd45972e1a6fe50
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jan 10 15:34:17 2023 +0800
[FLINK-30603] Fix unstable test CompactActionITCase#testStreamingCompact
This closes #473.
---
.../store/connector/action/CompactActionITCase.java | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 643880c5..7160529e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -60,6 +60,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
/** IT cases for {@link CompactAction}. */
public class CompactActionITCase extends AbstractTestBase {
@@ -112,6 +113,7 @@ public class CompactActionITCase extends AbstractTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(4) + 1);
new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
env.execute();
@@ -169,6 +171,7 @@ public class CompactActionITCase extends AbstractTestBase {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(500);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(4) + 1);
new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
env.executeAsync();
@@ -181,8 +184,11 @@ public class CompactActionITCase extends AbstractTestBase {
}
// first full compaction
- Assert.assertEquals(2, (long) plan.snapshotId);
- List<String> actual = getResult(table.newRead(), plan.splits());
+ List<String> actual = new ArrayList<>();
+ while (plan != null) {
+ actual.addAll(getResult(table.newRead(), plan.splits()));
+ plan = snapshotEnumerator.enumerate();
+ }
actual.sort(String::compareTo);
Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I 1|100|15|20221209"), actual);
@@ -204,8 +210,11 @@ public class CompactActionITCase extends AbstractTestBase {
}
// second full compaction
- Assert.assertEquals(4, (long) plan.snapshotId);
- actual = getResult(table.newRead(), plan.splits());
+ actual = new ArrayList<>();
+ while (plan != null) {
+ actual.addAll(getResult(table.newRead(), plan.splits()));
+ plan = snapshotEnumerator.enumerate();
+ }
actual.sort(String::compareTo);
Assert.assertEquals(
Arrays.asList(
@@ -216,7 +225,9 @@ public class CompactActionITCase extends AbstractTestBase {
actual);
// assert dedicated compact job will expire snapshots
- Assert.assertEquals(2L, (long) snapshotManager.earliestSnapshotId());
+ Assert.assertEquals(
+ snapshotManager.latestSnapshotId() - 2,
+ (long) snapshotManager.earliestSnapshotId());
}
private List<Map<String, String>> getSpecifiedPartitions() {