You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/10 09:51:34 UTC
[flink-table-store] branch master updated: [FLINK-26293] Improve FileStoreCommitTest.testSnapshotAddLogOffset
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 89fce84 [FLINK-26293] Improve FileStoreCommitTest.testSnapshotAddLogOffset
89fce84 is described below
commit 89fce843044a233839085afce29a9c68add2577c
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Mar 10 17:47:08 2022 +0800
[FLINK-26293] Improve FileStoreCommitTest.testSnapshotAddLogOffset
---
.../flink/table/store/file/TestFileStore.java | 20 +++--
.../store/file/operation/FileStoreCommitTest.java | 88 ++++++++--------------
2 files changed, 43 insertions(+), 65 deletions(-)
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index f10642b..70ca005 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -60,7 +60,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -76,8 +75,6 @@ public class TestFileStore extends FileStoreImpl {
private final RowDataSerializer keySerializer;
private final RowDataSerializer valueSerializer;
- private static final AtomicInteger ID = new AtomicInteger();
-
public static TestFileStore create(
String format,
String root,
@@ -130,12 +127,24 @@ public class TestFileStore extends FileStoreImpl {
Function<KeyValue, BinaryRowData> partitionCalculator,
Function<KeyValue, Integer> bucketCalculator)
throws Exception {
+ return commitData(kvs, partitionCalculator, bucketCalculator, new HashMap<>());
+ }
+
+ public List<Snapshot> commitData(
+ List<KeyValue> kvs,
+ Function<KeyValue, BinaryRowData> partitionCalculator,
+ Function<KeyValue, Integer> bucketCalculator,
+ Map<Integer, Long> logOffsets)
+ throws Exception {
return commitDataImpl(
kvs,
partitionCalculator,
bucketCalculator,
FileStoreWrite::createWriter,
- (commit, committable) -> commit.commit(committable, Collections.emptyMap()));
+ (commit, committable) -> {
+ logOffsets.forEach(committable::addLogOffset);
+ commit.commit(committable, Collections.emptyMap());
+ });
}
public List<Snapshot> overwriteData(
@@ -191,9 +200,6 @@ public class TestFileStore extends FileStoreImpl {
Increment increment = entryWithBucket.getValue().prepareCommit();
committable.addFileCommittable(
entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
- if (!committable.logOffsets().containsKey(entryWithBucket.getKey())) {
- committable.addLogOffset(entryWithBucket.getKey(), ID.getAndIncrement());
- }
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 7224d19..6a4c04b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -210,66 +210,32 @@ public class FileStoreCommitTest {
@Test
public void testSnapshotAddLogOffset() throws Exception {
- Map<BinaryRowData, List<KeyValue>> data1 =
- generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
- logData(
- () ->
- data1.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList()),
- "data1");
-
- TestFileStore store = createStore(false);
- List<Snapshot> commitSnapshots =
- store.commitData(
- data1.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList()),
- gen::getPartition,
- kv -> Math.toIntExact(kv.sequenceNumber() % 10));
-
- Map<Integer, Long> commitLogOffsets = commitSnapshots.get(0).getLogOffsets();
- assertThat(commitLogOffsets.size()).isEqualTo(10);
- commitLogOffsets.forEach((key, value) -> assertThat(key).isEqualTo(value.intValue()));
-
- ThreadLocalRandom random = ThreadLocalRandom.current();
- String dtToOverwrite =
- new ArrayList<>(data1.keySet())
- .get(random.nextInt(data1.size()))
- .getString(0)
- .toString();
- Map<String, String> partitionToOverwrite = new HashMap<>();
- partitionToOverwrite.put("dt", dtToOverwrite);
-
- // overwrite partial commit
- int numRecords = ThreadLocalRandom.current().nextInt(5) + 1;
- Map<BinaryRowData, List<KeyValue>> data2 = generateData(numRecords);
- data2.entrySet().removeIf(e -> !dtToOverwrite.equals(e.getKey().getString(0).toString()));
- logData(
- () ->
- data2.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList()),
- "data2");
- List<Snapshot> overwriteSnapshots =
- store.overwriteData(
- data2.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList()),
- gen::getPartition,
- kv -> Math.toIntExact(kv.sequenceNumber() % 10),
- partitionToOverwrite);
-
- Map<Integer, Long> overwriteLogOffsets = overwriteSnapshots.get(0).getLogOffsets();
- assertThat(overwriteLogOffsets.size()).isEqualTo(commitLogOffsets.size());
- assertThat(
- overwriteLogOffsets.entrySet().stream()
- .filter(o -> !o.getKey().equals(o.getValue().intValue()))
- .count())
- .isLessThanOrEqualTo(numRecords);
+ TestFileStore store = createStore(false, 2);
+
+ // commit 1
+ Map<Integer, Long> offsets = new HashMap<>();
+ offsets.put(0, 1L);
+ offsets.put(1, 3L);
+ Snapshot snapshot =
+ store.commitData(generateDataList(10), gen::getPartition, kv -> 0, offsets).get(0);
+ assertThat(snapshot.getLogOffsets()).isEqualTo(offsets);
+
+ // commit 2
+ offsets = new HashMap<>();
+ offsets.put(1, 8L);
+ snapshot =
+ store.commitData(generateDataList(10), gen::getPartition, kv -> 0, offsets).get(0);
+ Map<Integer, Long> expected = new HashMap<>();
+ expected.put(0, 1L);
+ expected.put(1, 8L);
+ assertThat(snapshot.getLogOffsets()).isEqualTo(expected);
}
private TestFileStore createStore(boolean failing) {
+ return createStore(failing, 1);
+ }
+
+ private TestFileStore createStore(boolean failing, int numBucket) {
String root =
failing
? FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())
@@ -277,13 +243,19 @@ public class FileStoreCommitTest {
return TestFileStore.create(
"avro",
root,
- 1,
+ numBucket,
TestKeyValueGenerator.PARTITION_TYPE,
TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE,
new DeduplicateAccumulator());
}
+ private List<KeyValue> generateDataList(int numRecords) {
+ return generateData(numRecords).values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) {
Map<BinaryRowData, List<KeyValue>> data = new HashMap<>();
for (int i = 0; i < numRecords; i++) {