You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2023/01/10 07:34:22 UTC

[flink-table-store] branch master updated: [FLINK-30603] Fix unstable test CompactActionITCase#testStreamingCompact

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new b7188bcc [FLINK-30603] Fix unstable test CompactActionITCase#testStreamingCompact
b7188bcc is described below

commit b7188bcc46989c66e44f1fb04cd45972e1a6fe50
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Jan 10 15:34:17 2023 +0800

    [FLINK-30603] Fix unstable test CompactActionITCase#testStreamingCompact
    
    This closes #473.
---
 .../store/connector/action/CompactActionITCase.java | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
index 643880c5..7160529e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/action/CompactActionITCase.java
@@ -60,6 +60,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 /** IT cases for {@link CompactAction}. */
 public class CompactActionITCase extends AbstractTestBase {
@@ -112,6 +113,7 @@ public class CompactActionITCase extends AbstractTestBase {
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(4) + 1);
         new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
         env.execute();
 
@@ -169,6 +171,7 @@ public class CompactActionITCase extends AbstractTestBase {
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
         env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
         env.getCheckpointConfig().setCheckpointInterval(500);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(4) + 1);
         new CompactAction(tablePath).withPartitions(getSpecifiedPartitions()).build(env);
         env.executeAsync();
 
@@ -181,8 +184,11 @@ public class CompactActionITCase extends AbstractTestBase {
         }
 
         // first full compaction
-        Assert.assertEquals(2, (long) plan.snapshotId);
-        List<String> actual = getResult(table.newRead(), plan.splits());
+        List<String> actual = new ArrayList<>();
+        while (plan != null) {
+            actual.addAll(getResult(table.newRead(), plan.splits()));
+            plan = snapshotEnumerator.enumerate();
+        }
         actual.sort(String::compareTo);
         Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I 1|100|15|20221209"), actual);
 
@@ -204,8 +210,11 @@ public class CompactActionITCase extends AbstractTestBase {
         }
 
         // second full compaction
-        Assert.assertEquals(4, (long) plan.snapshotId);
-        actual = getResult(table.newRead(), plan.splits());
+        actual = new ArrayList<>();
+        while (plan != null) {
+            actual.addAll(getResult(table.newRead(), plan.splits()));
+            plan = snapshotEnumerator.enumerate();
+        }
         actual.sort(String::compareTo);
         Assert.assertEquals(
                 Arrays.asList(
@@ -216,7 +225,9 @@ public class CompactActionITCase extends AbstractTestBase {
                 actual);
 
         // assert dedicated compact job will expire snapshots
-        Assert.assertEquals(2L, (long) snapshotManager.earliestSnapshotId());
+        Assert.assertEquals(
+                snapshotManager.latestSnapshotId() - 2,
+                (long) snapshotManager.earliestSnapshotId());
     }
 
     private List<Map<String, String>> getSpecifiedPartitions() {