You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/10 09:51:34 UTC

[flink-table-store] branch master updated: [FLINK-26293] Improve FileStoreCommitTest.testSnapshotAddLogOffset

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 89fce84  [FLINK-26293] Improve FileStoreCommitTest.testSnapshotAddLogOffset
89fce84 is described below

commit 89fce843044a233839085afce29a9c68add2577c
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Mar 10 17:47:08 2022 +0800

    [FLINK-26293] Improve FileStoreCommitTest.testSnapshotAddLogOffset
---
 .../flink/table/store/file/TestFileStore.java      | 20 +++--
 .../store/file/operation/FileStoreCommitTest.java  | 88 ++++++++--------------
 2 files changed, 43 insertions(+), 65 deletions(-)

diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index f10642b..70ca005 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -60,7 +60,6 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -76,8 +75,6 @@ public class TestFileStore extends FileStoreImpl {
     private final RowDataSerializer keySerializer;
     private final RowDataSerializer valueSerializer;
 
-    private static final AtomicInteger ID = new AtomicInteger();
-
     public static TestFileStore create(
             String format,
             String root,
@@ -130,12 +127,24 @@ public class TestFileStore extends FileStoreImpl {
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator)
             throws Exception {
+        return commitData(kvs, partitionCalculator, bucketCalculator, new HashMap<>());
+    }
+
+    public List<Snapshot> commitData(
+            List<KeyValue> kvs,
+            Function<KeyValue, BinaryRowData> partitionCalculator,
+            Function<KeyValue, Integer> bucketCalculator,
+            Map<Integer, Long> logOffsets)
+            throws Exception {
         return commitDataImpl(
                 kvs,
                 partitionCalculator,
                 bucketCalculator,
                 FileStoreWrite::createWriter,
-                (commit, committable) -> commit.commit(committable, Collections.emptyMap()));
+                (commit, committable) -> {
+                    logOffsets.forEach(committable::addLogOffset);
+                    commit.commit(committable, Collections.emptyMap());
+                });
     }
 
     public List<Snapshot> overwriteData(
@@ -191,9 +200,6 @@ public class TestFileStore extends FileStoreImpl {
                 Increment increment = entryWithBucket.getValue().prepareCommit();
                 committable.addFileCommittable(
                         entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
-                if (!committable.logOffsets().containsKey(entryWithBucket.getKey())) {
-                    committable.addLogOffset(entryWithBucket.getKey(), ID.getAndIncrement());
-                }
             }
         }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 7224d19..6a4c04b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -210,66 +210,32 @@ public class FileStoreCommitTest {
 
     @Test
     public void testSnapshotAddLogOffset() throws Exception {
-        Map<BinaryRowData, List<KeyValue>> data1 =
-                generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
-        logData(
-                () ->
-                        data1.values().stream()
-                                .flatMap(Collection::stream)
-                                .collect(Collectors.toList()),
-                "data1");
-
-        TestFileStore store = createStore(false);
-        List<Snapshot> commitSnapshots =
-                store.commitData(
-                        data1.values().stream()
-                                .flatMap(Collection::stream)
-                                .collect(Collectors.toList()),
-                        gen::getPartition,
-                        kv -> Math.toIntExact(kv.sequenceNumber() % 10));
-
-        Map<Integer, Long> commitLogOffsets = commitSnapshots.get(0).getLogOffsets();
-        assertThat(commitLogOffsets.size()).isEqualTo(10);
-        commitLogOffsets.forEach((key, value) -> assertThat(key).isEqualTo(value.intValue()));
-
-        ThreadLocalRandom random = ThreadLocalRandom.current();
-        String dtToOverwrite =
-                new ArrayList<>(data1.keySet())
-                        .get(random.nextInt(data1.size()))
-                        .getString(0)
-                        .toString();
-        Map<String, String> partitionToOverwrite = new HashMap<>();
-        partitionToOverwrite.put("dt", dtToOverwrite);
-
-        // overwrite partial commit
-        int numRecords = ThreadLocalRandom.current().nextInt(5) + 1;
-        Map<BinaryRowData, List<KeyValue>> data2 = generateData(numRecords);
-        data2.entrySet().removeIf(e -> !dtToOverwrite.equals(e.getKey().getString(0).toString()));
-        logData(
-                () ->
-                        data2.values().stream()
-                                .flatMap(Collection::stream)
-                                .collect(Collectors.toList()),
-                "data2");
-        List<Snapshot> overwriteSnapshots =
-                store.overwriteData(
-                        data2.values().stream()
-                                .flatMap(Collection::stream)
-                                .collect(Collectors.toList()),
-                        gen::getPartition,
-                        kv -> Math.toIntExact(kv.sequenceNumber() % 10),
-                        partitionToOverwrite);
-
-        Map<Integer, Long> overwriteLogOffsets = overwriteSnapshots.get(0).getLogOffsets();
-        assertThat(overwriteLogOffsets.size()).isEqualTo(commitLogOffsets.size());
-        assertThat(
-                        overwriteLogOffsets.entrySet().stream()
-                                .filter(o -> !o.getKey().equals(o.getValue().intValue()))
-                                .count())
-                .isLessThanOrEqualTo(numRecords);
+        TestFileStore store = createStore(false, 2);
+
+        // commit 1
+        Map<Integer, Long> offsets = new HashMap<>();
+        offsets.put(0, 1L);
+        offsets.put(1, 3L);
+        Snapshot snapshot =
+                store.commitData(generateDataList(10), gen::getPartition, kv -> 0, offsets).get(0);
+        assertThat(snapshot.getLogOffsets()).isEqualTo(offsets);
+
+        // commit 2
+        offsets = new HashMap<>();
+        offsets.put(1, 8L);
+        snapshot =
+                store.commitData(generateDataList(10), gen::getPartition, kv -> 0, offsets).get(0);
+        Map<Integer, Long> expected = new HashMap<>();
+        expected.put(0, 1L);
+        expected.put(1, 8L);
+        assertThat(snapshot.getLogOffsets()).isEqualTo(expected);
     }
 
     private TestFileStore createStore(boolean failing) {
+        return createStore(failing, 1);
+    }
+
+    private TestFileStore createStore(boolean failing, int numBucket) {
         String root =
                 failing
                         ? FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())
@@ -277,13 +243,19 @@ public class FileStoreCommitTest {
         return TestFileStore.create(
                 "avro",
                 root,
-                1,
+                numBucket,
                 TestKeyValueGenerator.PARTITION_TYPE,
                 TestKeyValueGenerator.KEY_TYPE,
                 TestKeyValueGenerator.ROW_TYPE,
                 new DeduplicateAccumulator());
     }
 
+    private List<KeyValue> generateDataList(int numRecords) {
+        return generateData(numRecords).values().stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
     private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) {
         Map<BinaryRowData, List<KeyValue>> data = new HashMap<>();
         for (int i = 0; i < numRecords; i++) {