You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/01 09:49:34 UTC
[flink-table-store] branch master updated: [hotfix] Refactor OperationTestUtils into TestFileStore and remove unnecessary repeated tests
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 55385ae [hotfix] Refactor OperationTestUtils into TestFileStore and remove unnecessary repeated tests
55385ae is described below
commit 55385ae8a556f87b22687a4f2a7ca93245ddcaba
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Mar 1 17:49:27 2022 +0800
[hotfix] Refactor OperationTestUtils into TestFileStore and remove unnecessary repeated tests
This closes #27
---
.../flink/table/store/file/FileStoreImpl.java | 7 +-
.../store/file/manifest/ManifestCommittable.java | 12 +-
.../store/file/operation/FileStoreCommitImpl.java | 11 ++
.../OperationTestUtils.java => TestFileStore.java} | 201 +++++++--------------
.../store/file/operation/FileStoreCommitTest.java | 62 ++++---
.../store/file/operation/FileStoreExpireTest.java | 58 +++---
.../store/file/operation/FileStoreScanTest.java | 62 +++----
.../store/file/operation/TestCommitThread.java | 15 +-
8 files changed, 183 insertions(+), 245 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
index 52857fa..2fa5c48 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
@@ -71,7 +72,8 @@ public class FileStoreImpl implements FileStore {
.generateRecordComparator("KeyComparator");
}
- private FileStorePathFactory pathFactory() {
+ @VisibleForTesting
+ public FileStorePathFactory pathFactory() {
return new FileStorePathFactory(
options.path(), partitionType, options.partitionDefaultName());
}
@@ -81,7 +83,8 @@ public class FileStoreImpl implements FileStore {
partitionType, keyType, valueType, options.manifestFormat(), pathFactory());
}
- private ManifestList.Factory manifestListFactory() {
+ @VisibleForTesting
+ public ManifestList.Factory manifestListFactory() {
return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index a461c2d..339d1d9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -126,16 +126,16 @@ public class ManifestCommittable {
@Override
public String toString() {
- return "ManifestCommittable{"
- + "identifier="
+ return "ManifestCommittable { "
+ + "identifier = "
+ identifier
- + ", logOffsets="
+ + ", logOffsets = "
+ logOffsets
- + ", newFiles="
+ + ", newFiles =\n"
+ filesToString(newFiles)
- + ", compactBefore="
+ + ", compactBefore =\n"
+ filesToString(compactBefore)
- + ", compactAfter="
+ + ", compactAfter =\n"
+ filesToString(compactAfter)
+ '}';
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index fe20b45..d8f8211 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -378,6 +378,17 @@ public class FileStoreCommitImpl implements FileStoreCommit {
}
if (success) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "Successfully commit snapshot #%d (path %s) by user %s "
+ + "with identifier %s and kind %s.",
+ newSnapshotId,
+ newSnapshotPath.toString(),
+ commitUser,
+ identifier,
+ commitKind.name()));
+ }
return true;
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
similarity index 56%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 0fe2fe0..38b4057 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -16,31 +16,31 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.operation;
+package org.apache.flink.table.store.file;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
-import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
+import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.RecordWriter;
-import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.QuadFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -55,154 +55,93 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
-/** Utils for operation tests. */
-public class OperationTestUtils {
+/** {@link FileStore} for tests. */
+public class TestFileStore extends FileStoreImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
+
+ private final RowDataSerializer keySerializer;
+ private final RowDataSerializer valueSerializer;
- public static MergeTreeOptions getMergeTreeOptions(boolean forceCompact) {
+ public static TestFileStore create(
+ String format,
+ String root,
+ int numBuckets,
+ RowType partitionType,
+ RowType keyType,
+ RowType valueType,
+ Accumulator accumulator) {
Configuration conf = new Configuration();
+
conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
conf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
- conf.set(MergeTreeOptions.COMMIT_FORCE_COMPACT, forceCompact);
- return new MergeTreeOptions(conf);
- }
-
- public static FileStoreScan createScan(
- FileFormat fileFormat, FileStorePathFactory pathFactory) {
- return new FileStoreScanImpl(
- TestKeyValueGenerator.PARTITION_TYPE,
- pathFactory,
- createManifestFileFactory(fileFormat, pathFactory),
- createManifestListFactory(fileFormat, pathFactory));
- }
-
- public static FileStoreCommit createCommit(
- FileFormat fileFormat, FileStorePathFactory pathFactory) {
- ManifestFile.Factory testManifestFileFactory =
- createManifestFileFactory(fileFormat, pathFactory);
- ManifestList.Factory testManifestListFactory =
- createManifestListFactory(fileFormat, pathFactory);
- return new FileStoreCommitImpl(
- UUID.randomUUID().toString(),
- TestKeyValueGenerator.PARTITION_TYPE,
- pathFactory,
- testManifestFileFactory,
- testManifestListFactory,
- createScan(fileFormat, pathFactory),
- 1,
- MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"),
- 30);
- }
-
- public static FileStoreWrite createWrite(
- FileFormat fileFormat, FileStorePathFactory pathFactory) {
- return new FileStoreWriteImpl(
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- TestKeyValueGenerator.KEY_COMPARATOR,
- new DeduplicateAccumulator(),
- fileFormat,
- pathFactory,
- createScan(fileFormat, pathFactory),
- getMergeTreeOptions(false));
- }
- public static FileStoreExpire createExpire(
- int numRetained,
- long millisRetained,
- FileFormat fileFormat,
- FileStorePathFactory pathFactory) {
- return new FileStoreExpireImpl(
- numRetained,
- millisRetained,
- pathFactory,
- createManifestListFactory(fileFormat, pathFactory),
- createScan(fileFormat, pathFactory));
- }
+ conf.set(
+ FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+ MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
- public static FileStoreRead createRead(
- FileFormat fileFormat, FileStorePathFactory pathFactory) {
- return new FileStoreReadImpl(
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- TestKeyValueGenerator.KEY_COMPARATOR,
- new DeduplicateAccumulator(),
- fileFormat,
- pathFactory);
- }
+ conf.set(FileStoreOptions.FILE_FORMAT, format);
+ conf.set(FileStoreOptions.MANIFEST_FORMAT, format);
+ conf.set(FileStoreOptions.FILE_PATH, root);
+ conf.set(FileStoreOptions.BUCKET, numBuckets);
- public static FileStorePathFactory createPathFactory(boolean failing, String root) {
- String path =
- failing
- ? FailingAtomicRenameFileSystem.getFailingPath(root)
- : TestAtomicRenameFileSystem.SCHEME + "://" + root;
- return new FileStorePathFactory(
- new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
+ return new TestFileStore(conf, partitionType, keyType, valueType, accumulator);
}
- private static ManifestFile.Factory createManifestFileFactory(
- FileFormat fileFormat, FileStorePathFactory pathFactory) {
- return new ManifestFile.Factory(
- TestKeyValueGenerator.PARTITION_TYPE,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- fileFormat,
- pathFactory);
+ public TestFileStore(
+ Configuration conf,
+ RowType partitionType,
+ RowType keyType,
+ RowType valueType,
+ Accumulator accumulator) {
+ super(conf, UUID.randomUUID().toString(), partitionType, keyType, valueType, accumulator);
+ this.keySerializer = new RowDataSerializer(keyType);
+ this.valueSerializer = new RowDataSerializer(valueType);
}
- private static ManifestList.Factory createManifestListFactory(
- FileFormat fileFormat, FileStorePathFactory pathFactory) {
- return new ManifestList.Factory(
- TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory);
+ public FileStoreExpireImpl newExpire(int numRetained, long millisRetained) {
+ return new FileStoreExpireImpl(
+ numRetained, millisRetained, pathFactory(), manifestListFactory(), newScan());
}
- public static List<Snapshot> commitData(
+ public List<Snapshot> commitData(
List<KeyValue> kvs,
Function<KeyValue, BinaryRowData> partitionCalculator,
- Function<KeyValue, Integer> bucketCalculator,
- FileFormat fileFormat,
- FileStorePathFactory pathFactory)
+ Function<KeyValue, Integer> bucketCalculator)
throws Exception {
return commitDataImpl(
kvs,
partitionCalculator,
bucketCalculator,
- fileFormat,
- pathFactory,
FileStoreWrite::createWriter,
(commit, committable) -> commit.commit(committable, Collections.emptyMap()));
}
- public static List<Snapshot> overwriteData(
+ public List<Snapshot> overwriteData(
List<KeyValue> kvs,
Function<KeyValue, BinaryRowData> partitionCalculator,
Function<KeyValue, Integer> bucketCalculator,
- FileFormat fileFormat,
- FileStorePathFactory pathFactory,
Map<String, String> partition)
throws Exception {
return commitDataImpl(
kvs,
partitionCalculator,
bucketCalculator,
- fileFormat,
- pathFactory,
FileStoreWrite::createEmptyWriter,
(commit, committable) ->
commit.overwrite(partition, committable, Collections.emptyMap()));
}
- private static List<Snapshot> commitDataImpl(
+ private List<Snapshot> commitDataImpl(
List<KeyValue> kvs,
Function<KeyValue, BinaryRowData> partitionCalculator,
Function<KeyValue, Integer> bucketCalculator,
- FileFormat fileFormat,
- FileStorePathFactory pathFactory,
QuadFunction<FileStoreWrite, BinaryRowData, Integer, ExecutorService, RecordWriter>
createWriterFunction,
BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
throws Exception {
- FileStoreWrite write = createWrite(fileFormat, pathFactory);
+ FileStoreWrite write = newWrite();
Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
for (KeyValue kv : kvs) {
BinaryRowData partition = partitionCalculator.apply(kv);
@@ -222,7 +161,7 @@ public class OperationTestUtils {
.write(kv.valueKind(), kv.key(), kv.value());
}
- FileStoreCommit commit = createCommit(fileFormat, pathFactory);
+ FileStoreCommit commit = newCommit();
ManifestCommittable committable =
new ManifestCommittable(String.valueOf(new Random().nextLong()));
for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
@@ -235,6 +174,7 @@ public class OperationTestUtils {
}
}
+ FileStorePathFactory pathFactory = pathFactory();
Long snapshotIdBeforeCommit = pathFactory.latestSnapshotId();
if (snapshotIdBeforeCommit == null) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
@@ -263,17 +203,19 @@ public class OperationTestUtils {
return snapshots;
}
- public static List<KeyValue> readKvsFromSnapshot(
- long snapshotId, FileFormat fileFormat, FileStorePathFactory pathFactory)
- throws IOException {
- List<ManifestEntry> entries =
- createScan(fileFormat, pathFactory).withSnapshot(snapshotId).plan().files();
- return readKvsFromManifestEntries(entries, fileFormat, pathFactory);
+ public List<KeyValue> readKvsFromSnapshot(long snapshotId) throws IOException {
+ List<ManifestEntry> entries = newScan().withSnapshot(snapshotId).plan().files();
+ return readKvsFromManifestEntries(entries);
}
- public static List<KeyValue> readKvsFromManifestEntries(
- List<ManifestEntry> entries, FileFormat fileFormat, FileStorePathFactory pathFactory)
+ public List<KeyValue> readKvsFromManifestEntries(List<ManifestEntry> entries)
throws IOException {
+ if (LOG.isDebugEnabled()) {
+ for (ManifestEntry entry : entries) {
+ LOG.debug("reading from " + entry.toString());
+ }
+ }
+
Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> filesPerPartitionAndBucket =
new HashMap<>();
for (ManifestEntry entry : entries) {
@@ -284,7 +226,7 @@ public class OperationTestUtils {
}
List<KeyValue> kvs = new ArrayList<>();
- FileStoreRead read = createRead(fileFormat, pathFactory);
+ FileStoreRead read = newRead();
for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>> entryWithPartition :
filesPerPartitionAndBucket.entrySet()) {
for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
@@ -296,23 +238,18 @@ public class OperationTestUtils {
entryWithBucket.getKey(),
entryWithBucket.getValue()));
while (iterator.hasNext()) {
- kvs.add(
- iterator.next()
- .copy(
- TestKeyValueGenerator.KEY_SERIALIZER,
- TestKeyValueGenerator.ROW_SERIALIZER));
+ kvs.add(iterator.next().copy(keySerializer, valueSerializer));
}
}
}
return kvs;
}
- public static Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
+ public Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
for (KeyValue kv : kvs) {
- BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
- BinaryRowData value =
- TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
+ BinaryRowData key = keySerializer.toBinaryRow(kv.key()).copy();
+ BinaryRowData value = valueSerializer.toBinaryRow(kv.value()).copy();
switch (kv.valueKind()) {
case ADD:
result.put(key, value);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 136dff1..c558ed4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -18,15 +18,15 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -53,10 +53,6 @@ public class FileStoreCommitTest {
private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitTest.class);
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
-
private TestKeyValueGenerator gen;
@TempDir java.nio.file.Path tempDir;
@@ -108,13 +104,13 @@ public class FileStoreCommitTest {
for (int i = 0; i < numThreads; i++) {
TestCommitThread thread =
new TestCommitThread(
- dataPerThread.get(i),
- OperationTestUtils.createPathFactory(failing, tempDir.toString()),
- OperationTestUtils.createPathFactory(false, tempDir.toString()));
+ dataPerThread.get(i), createStore(failing), createStore(false));
thread.start();
threads.add(thread);
}
+ TestFileStore store = createStore(false);
+
// calculate expected results
Map<BinaryRowData, List<KeyValue>> threadResults = new HashMap<>();
for (TestCommitThread thread : threads) {
@@ -124,21 +120,18 @@ public class FileStoreCommitTest {
}
}
Map<BinaryRowData, BinaryRowData> expected =
- OperationTestUtils.toKvMap(
+ store.toKvMap(
threadResults.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
// read actual data and compare
- FileStorePathFactory safePathFactory =
- OperationTestUtils.createPathFactory(false, tempDir.toString());
- Long snapshotId = safePathFactory.latestSnapshotId();
+ Long snapshotId = store.pathFactory().latestSnapshotId();
assertThat(snapshotId).isNotNull();
- List<KeyValue> actualKvs =
- OperationTestUtils.readKvsFromSnapshot(snapshotId, avro, safePathFactory);
+ List<KeyValue> actualKvs = store.readKvsFromSnapshot(snapshotId);
gen.sort(actualKvs);
logData(() -> actualKvs, "raw read results");
- Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+ Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
logData(() -> kvMapToKvList(expected), "expected");
logData(() -> kvMapToKvList(actual), "actual");
assertThat(actual).isEqualTo(expected);
@@ -155,14 +148,11 @@ public class FileStoreCommitTest {
.collect(Collectors.toList()),
"data1");
- FileStorePathFactory pathFactory =
- OperationTestUtils.createPathFactory(false, tempDir.toString());
- OperationTestUtils.commitData(
+ TestFileStore store = createStore(false);
+ store.commitData(
data1.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
gen::getPartition,
- kv -> 0,
- avro,
- pathFactory);
+ kv -> 0);
ThreadLocalRandom random = ThreadLocalRandom.current();
String dtToOverwrite =
@@ -186,12 +176,10 @@ public class FileStoreCommitTest {
.flatMap(Collection::stream)
.collect(Collectors.toList()),
"data2");
- OperationTestUtils.overwriteData(
+ store.overwriteData(
data2.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
gen::getPartition,
kv -> 0,
- avro,
- pathFactory,
partitionToOverwrite);
List<KeyValue> expectedKvs = new ArrayList<>();
@@ -203,19 +191,33 @@ public class FileStoreCommitTest {
}
data2.values().forEach(expectedKvs::addAll);
gen.sort(expectedKvs);
- Map<BinaryRowData, BinaryRowData> expected = OperationTestUtils.toKvMap(expectedKvs);
+ Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
List<KeyValue> actualKvs =
- OperationTestUtils.readKvsFromSnapshot(
- pathFactory.latestSnapshotId(), avro, pathFactory);
+ store.readKvsFromSnapshot(store.pathFactory().latestSnapshotId());
gen.sort(actualKvs);
- Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+ Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
logData(() -> kvMapToKvList(expected), "expected");
logData(() -> kvMapToKvList(actual), "actual");
assertThat(actual).isEqualTo(expected);
}
+ private TestFileStore createStore(boolean failing) {
+ String root =
+ failing
+ ? FailingAtomicRenameFileSystem.getFailingPath(tempDir.toString())
+ : TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString();
+ return TestFileStore.create(
+ "avro",
+ root,
+ 1,
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ new DeduplicateAccumulator());
+ }
+
private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) {
Map<BinaryRowData, List<KeyValue>> data = new HashMap<>();
for (int i = 0; i < numRecords; i++) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 54ba19f..e007ca0 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -18,14 +18,14 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.junit.jupiter.api.BeforeEach;
@@ -43,26 +43,31 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FileStoreExpireImpl}. */
public class FileStoreExpireTest {
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
-
private TestKeyValueGenerator gen;
@TempDir java.nio.file.Path tempDir;
+ private TestFileStore store;
private FileStorePathFactory pathFactory;
@BeforeEach
public void beforeEach() throws IOException {
gen = new TestKeyValueGenerator();
- pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
Path root = new Path(tempDir.toString());
root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+ store =
+ TestFileStore.create(
+ "avro",
+ tempDir.toString(),
+ 1,
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ new DeduplicateAccumulator());
+ pathFactory = store.pathFactory();
}
@Test
public void testNoSnapshot() {
- FileStoreExpire expire =
- OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+ FileStoreExpire expire = store.newExpire(3, Long.MAX_VALUE);
expire.expire();
assertThat(pathFactory.latestSnapshotId()).isNull();
@@ -74,9 +79,7 @@ public class FileStoreExpireTest {
List<Integer> snapshotPositions = new ArrayList<>();
commit(2, allData, snapshotPositions);
int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
- FileStoreExpire expire =
- OperationTestUtils.createExpire(
- latestSnapshotId + 1, Long.MAX_VALUE, avro, pathFactory);
+ FileStoreExpire expire = store.newExpire(latestSnapshotId + 1, Long.MAX_VALUE);
expire.expire();
FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
@@ -92,9 +95,7 @@ public class FileStoreExpireTest {
List<Integer> snapshotPositions = new ArrayList<>();
commit(5, allData, snapshotPositions);
int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
- FileStoreExpire expire =
- OperationTestUtils.createExpire(
- Integer.MAX_VALUE, Long.MAX_VALUE, avro, pathFactory);
+ FileStoreExpire expire = store.newExpire(Integer.MAX_VALUE, Long.MAX_VALUE);
expire.expire();
FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
@@ -111,8 +112,7 @@ public class FileStoreExpireTest {
commit(3, allData, snapshotPositions);
int latestSnapshotId = pathFactory.latestSnapshotId().intValue();
Thread.sleep(100);
- FileStoreExpire expire =
- OperationTestUtils.createExpire(Integer.MAX_VALUE, 1, avro, pathFactory);
+ FileStoreExpire expire = store.newExpire(Integer.MAX_VALUE, 1);
expire.expire();
FileSystem fs = pathFactory.toSnapshotPath(latestSnapshotId).getFileSystem();
@@ -125,8 +125,7 @@ public class FileStoreExpireTest {
@Test
public void testExpireWithNumber() throws Exception {
- FileStoreExpire expire =
- OperationTestUtils.createExpire(3, Long.MAX_VALUE, avro, pathFactory);
+ FileStoreExpire expire = store.newExpire(3, Long.MAX_VALUE);
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
@@ -149,8 +148,7 @@ public class FileStoreExpireTest {
@Test
public void testExpireWithTime() throws Exception {
- FileStoreExpire expire =
- OperationTestUtils.createExpire(Integer.MAX_VALUE, 1000, avro, pathFactory);
+ FileStoreExpire expire = store.newExpire(Integer.MAX_VALUE, 1000);
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
@@ -183,9 +181,7 @@ public class FileStoreExpireTest {
data.add(gen.next());
}
allData.addAll(data);
- List<Snapshot> snapshots =
- OperationTestUtils.commitData(
- data, gen::getPartition, kv -> 0, avro, pathFactory);
+ List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0);
for (int j = 0; j < snapshots.size(); j++) {
snapshotPositions.add(allData.size());
}
@@ -196,18 +192,12 @@ public class FileStoreExpireTest {
int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions)
throws Exception {
Map<BinaryRowData, BinaryRowData> expected =
- OperationTestUtils.toKvMap(
- allData.subList(0, snapshotPositions.get(snapshotId - 1)));
+ store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1)));
List<KeyValue> actualKvs =
- OperationTestUtils.readKvsFromManifestEntries(
- OperationTestUtils.createScan(avro, pathFactory)
- .withSnapshot(snapshotId)
- .plan()
- .files(),
- avro,
- pathFactory);
+ store.readKvsFromManifestEntries(
+ store.newScan().withSnapshot(snapshotId).plan().files());
gen.sort(actualKvs);
- Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+ Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
assertThat(actual).isEqualTo(expected);
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
index 6f21c9e..bde976a 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -18,19 +18,19 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
@@ -50,23 +50,29 @@ public class FileStoreScanTest {
private static final int NUM_BUCKETS = 10;
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
-
private TestKeyValueGenerator gen;
@TempDir java.nio.file.Path tempDir;
+ private TestFileStore store;
private FileStorePathFactory pathFactory;
@BeforeEach
public void beforeEach() throws IOException {
gen = new TestKeyValueGenerator();
- pathFactory = OperationTestUtils.createPathFactory(false, tempDir.toString());
Path root = new Path(tempDir.toString());
root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+ store =
+ TestFileStore.create(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ new DeduplicateAccumulator());
+ pathFactory = store.pathFactory();
}
- @RepeatedTest(10)
+ @Test
public void testWithPartitionFilter() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
List<KeyValue> data = generateData(random.nextInt(1000) + 1);
@@ -82,12 +88,12 @@ public class FileStoreScanTest {
wantedPartitions.add(partitions.get(random.nextInt(partitions.size())));
}
- FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ FileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
scan.withPartitionFilter(new ArrayList<>(wantedPartitions));
Map<BinaryRowData, BinaryRowData> expected =
- OperationTestUtils.toKvMap(
+ store.toKvMap(
wantedPartitions.isEmpty()
? data
: data.stream()
@@ -99,7 +105,7 @@ public class FileStoreScanTest {
runTest(scan, snapshot.id(), expected);
}
- @RepeatedTest(10)
+ @Test
public void testWithBucket() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
List<KeyValue> data = generateData(random.nextInt(1000) + 1);
@@ -107,19 +113,19 @@ public class FileStoreScanTest {
int wantedBucket = random.nextInt(NUM_BUCKETS);
- FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ FileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
scan.withBucket(wantedBucket);
Map<BinaryRowData, BinaryRowData> expected =
- OperationTestUtils.toKvMap(
+ store.toKvMap(
data.stream()
.filter(kv -> getBucket(kv) == wantedBucket)
.collect(Collectors.toList()));
runTest(scan, snapshot.id(), expected);
}
- @RepeatedTest(10)
+ @Test
public void testWithSnapshot() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numCommits = random.nextInt(10) + 1;
@@ -134,18 +140,18 @@ public class FileStoreScanTest {
}
long wantedSnapshot = snapshots.get(wantedCommit).id();
- FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ FileStoreScan scan = store.newScan();
scan.withSnapshot(wantedSnapshot);
Map<BinaryRowData, BinaryRowData> expected =
- OperationTestUtils.toKvMap(
+ store.toKvMap(
allData.subList(0, wantedCommit + 1).stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
runTest(scan, wantedSnapshot, expected);
}
- @RepeatedTest(10)
+ @Test
public void testWithManifestList() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
int numCommits = random.nextInt(10) + 1;
@@ -154,22 +160,19 @@ public class FileStoreScanTest {
writeData(data);
}
- ManifestList manifestList =
- new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
- .create();
+ ManifestList manifestList = store.manifestListFactory().create();
long wantedSnapshot = random.nextLong(pathFactory.latestSnapshotId()) + 1;
List<ManifestFileMeta> wantedManifests =
manifestList.read(
Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshot))
.manifestList());
- FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ FileStoreScan scan = store.newScan();
scan.withManifestList(wantedManifests);
- List<KeyValue> expectedKvs =
- OperationTestUtils.readKvsFromSnapshot(wantedSnapshot, avro, pathFactory);
+ List<KeyValue> expectedKvs = store.readKvsFromSnapshot(wantedSnapshot);
gen.sort(expectedKvs);
- Map<BinaryRowData, BinaryRowData> expected = OperationTestUtils.toKvMap(expectedKvs);
+ Map<BinaryRowData, BinaryRowData> expected = store.toKvMap(expectedKvs);
runTest(scan, null, expected);
}
@@ -179,10 +182,9 @@ public class FileStoreScanTest {
FileStoreScan.Plan plan = scan.plan();
assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
- List<KeyValue> actualKvs =
- OperationTestUtils.readKvsFromManifestEntries(plan.files(), avro, pathFactory);
+ List<KeyValue> actualKvs = store.readKvsFromManifestEntries(plan.files());
gen.sort(actualKvs);
- Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+ Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs);
assertThat(actual).isEqualTo(expected);
}
@@ -195,9 +197,7 @@ public class FileStoreScanTest {
}
private Snapshot writeData(List<KeyValue> kvs) throws Exception {
- List<Snapshot> snapshots =
- OperationTestUtils.commitData(
- kvs, gen::getPartition, this::getBucket, avro, pathFactory);
+ List<Snapshot> snapshots = store.commitData(kvs, gen::getPartition, this::getBucket);
return snapshots.get(snapshots.size() - 1);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index c6b6501..7ed31a8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -18,14 +18,12 @@
package org.apache.flink.table.store.file.operation;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,17 +52,14 @@ public class TestCommitThread extends Thread {
public TestCommitThread(
Map<BinaryRowData, List<KeyValue>> data,
- FileStorePathFactory testPathFactory,
- FileStorePathFactory safePathFactory) {
+ TestFileStore testStore,
+ TestFileStore safeStore) {
this.data = data;
this.result = new HashMap<>();
this.writers = new HashMap<>();
- FileFormat avro =
- FileFormat.fromIdentifier(
- FileStoreCommitTest.class.getClassLoader(), "avro", new Configuration());
- this.write = OperationTestUtils.createWrite(avro, safePathFactory);
- this.commit = OperationTestUtils.createCommit(avro, testPathFactory);
+ this.write = safeStore.newWrite();
+ this.commit = testStore.newCommit();
}
public Map<BinaryRowData, List<KeyValue>> getResult() {