You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/29 02:42:29 UTC
[flink-table-store] branch master updated: [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new b647508 [FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
b647508 is described below
commit b647508352eec54051550e2da9852fc144579824
Author: tsreaper <ts...@gmail.com>
AuthorDate: Sat Jan 29 10:40:29 2022 +0800
[FLINK-25803] Implement partition and bucket filter in FileStoreScanImpl
This closes #14
---
.../table/store/file/manifest/ManifestFile.java | 53 ++++-
.../table/store/file/manifest/ManifestList.java | 43 +++-
.../store/file/mergetree/MergeTreeFactory.java | 64 ++++++
.../table/store/file/mergetree/sst/SstFile.java | 57 ++++-
.../store/file/mergetree/sst/SstPathFactory.java | 10 +-
.../store/file/operation/FileStoreCommitImpl.java | 30 ++-
.../table/store/file/operation/FileStoreScan.java | 3 +
.../store/file/operation/FileStoreScanImpl.java | 155 ++++++++++---
.../store/file/operation/FileStoreWriteImpl.java | 71 ++++++
.../file/stats/FieldStatsArraySerializer.java | 22 +-
.../store/file/stats/FieldStatsCollector.java | 17 +-
.../store/file/utils/FileStorePathFactory.java | 22 +-
.../flink/table/store/file/utils/FileUtils.java | 18 ++
.../file/utils/RowDataToObjectArrayConverter.java | 55 +++++
.../store/file/manifest/ManifestFileMetaTest.java | 13 +-
.../store/file/manifest/ManifestFileTest.java | 13 +-
.../store/file/manifest/ManifestListTest.java | 3 +-
.../table/store/file/mergetree/MergeTreeTest.java | 31 +--
.../store/file/mergetree/sst/SstFileTest.java | 23 +-
.../file/operation/FileStoreCommitTestBase.java | 113 ++-------
.../store/file/operation/FileStoreScanTest.java | 255 +++++++++++++++++++++
.../store/file/operation/OperationTestUtils.java | 190 +++++++++++++++
.../store/file/operation/TestCommitThread.java | 115 +---------
23 files changed, 1033 insertions(+), 343 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 1b2bb92..77b3ba7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -47,17 +47,16 @@ public class ManifestFile {
private final BulkWriter.Factory<RowData> writerFactory;
private final FileStorePathFactory pathFactory;
- public ManifestFile(
+ private ManifestFile(
RowType partitionType,
- RowType keyType,
- RowType rowType,
- FileFormat fileFormat,
+ ManifestEntrySerializer serializer,
+ BulkFormat<RowData, FileSourceSplit> readerFactory,
+ BulkWriter.Factory<RowData> writerFactory,
FileStorePathFactory pathFactory) {
this.partitionType = partitionType;
- this.serializer = new ManifestEntrySerializer(partitionType, keyType, rowType);
- RowType entryType = ManifestEntry.schema(partitionType, keyType, rowType);
- this.readerFactory = fileFormat.createReaderFactory(entryType);
- this.writerFactory = fileFormat.createWriterFactory(entryType);
+ this.serializer = serializer;
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
this.pathFactory = pathFactory;
}
@@ -123,4 +122,42 @@ public class ManifestFile {
public void delete(String fileName) {
FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName));
}
+
+ /**
+ * Creator of {@link ManifestFile}. It reueses {@link BulkFormat} and {@link BulkWriter.Factory}
+ * from {@link FileFormat}.
+ */
+ public static class Factory {
+
+ private final RowType partitionType;
+ private final RowType keyType;
+ private final RowType rowType;
+ private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+ private final BulkWriter.Factory<RowData> writerFactory;
+ private final FileStorePathFactory pathFactory;
+
+ public Factory(
+ RowType partitionType,
+ RowType keyType,
+ RowType rowType,
+ FileFormat fileFormat,
+ FileStorePathFactory pathFactory) {
+ this.partitionType = partitionType;
+ this.keyType = keyType;
+ this.rowType = rowType;
+ RowType entryType = ManifestEntry.schema(partitionType, keyType, rowType);
+ this.readerFactory = fileFormat.createReaderFactory(entryType);
+ this.writerFactory = fileFormat.createWriterFactory(entryType);
+ this.pathFactory = pathFactory;
+ }
+
+ public ManifestFile create() {
+ return new ManifestFile(
+ partitionType,
+ new ManifestEntrySerializer(partitionType, keyType, rowType),
+ readerFactory,
+ writerFactory,
+ pathFactory);
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 1b008dd..8a9707b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -45,12 +45,14 @@ public class ManifestList {
private final BulkWriter.Factory<RowData> writerFactory;
private final FileStorePathFactory pathFactory;
- public ManifestList(
- RowType partitionType, FileFormat fileFormat, FileStorePathFactory pathFactory) {
- this.serializer = new ManifestFileMetaSerializer(partitionType);
- RowType metaType = ManifestFileMeta.schema(partitionType);
- this.readerFactory = fileFormat.createReaderFactory(metaType);
- this.writerFactory = fileFormat.createWriterFactory(metaType);
+ private ManifestList(
+ ManifestFileMetaSerializer serializer,
+ BulkFormat<RowData, FileSourceSplit> readerFactory,
+ BulkWriter.Factory<RowData> writerFactory,
+ FileStorePathFactory pathFactory) {
+ this.serializer = serializer;
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
this.pathFactory = pathFactory;
}
@@ -97,4 +99,33 @@ public class ManifestList {
public void delete(String fileName) {
FileUtils.deleteOrWarn(pathFactory.toManifestListPath(fileName));
}
+
+ /**
+ * Creator of {@link ManifestList}. It reueses {@link BulkFormat} and {@link BulkWriter.Factory}
+ * from {@link FileFormat}.
+ */
+ public static class Factory {
+
+ private final RowType partitionType;
+ private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+ private final BulkWriter.Factory<RowData> writerFactory;
+ private final FileStorePathFactory pathFactory;
+
+ public Factory(
+ RowType partitionType, FileFormat fileFormat, FileStorePathFactory pathFactory) {
+ this.partitionType = partitionType;
+ RowType metaType = ManifestFileMeta.schema(partitionType);
+ this.readerFactory = fileFormat.createReaderFactory(metaType);
+ this.writerFactory = fileFormat.createWriterFactory(metaType);
+ this.pathFactory = pathFactory;
+ }
+
+ public ManifestList create() {
+ return new ManifestList(
+ new ManifestFileMetaSerializer(partitionType),
+ readerFactory,
+ writerFactory,
+ pathFactory);
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeFactory.java
new file mode 100644
index 0000000..d24d517
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.mergetree.compact.Accumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+
+/** Create a new {@link MergeTree} with specific partition and bucket. */
+public class MergeTreeFactory {
+
+ private final Comparator<RowData> keyComparator;
+ private final Accumulator accumulator;
+ private final MergeTreeOptions mergeTreeOptions;
+ private final SstFile.Factory sstFileFactory;
+
+ public MergeTreeFactory(
+ RowType keyType,
+ RowType rowType,
+ Comparator<RowData> keyComparator,
+ Accumulator accumulator,
+ FileFormat fileFormat,
+ FileStorePathFactory pathFactory,
+ MergeTreeOptions mergeTreeOptions) {
+ this.keyComparator = keyComparator;
+ this.accumulator = accumulator;
+ this.mergeTreeOptions = mergeTreeOptions;
+ this.sstFileFactory =
+ new SstFile.Factory(
+ keyType, rowType, fileFormat, pathFactory, mergeTreeOptions.targetFileSize);
+ }
+
+ public MergeTree create(BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+ return new MergeTree(
+ mergeTreeOptions,
+ sstFileFactory.create(partition, bucket),
+ keyComparator,
+ compactExecutor,
+ accumulator);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
index caae17b..058bf8a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.types.logical.RowType;
@@ -57,24 +58,22 @@ public class SstFile {
private final RowType keyType;
private final RowType valueType;
-
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final BulkWriter.Factory<RowData> writerFactory;
private final SstPathFactory pathFactory;
private final long suggestedFileSize;
- public SstFile(
+ private SstFile(
RowType keyType,
RowType valueType,
- FileFormat fileFormat,
+ BulkFormat<RowData, FileSourceSplit> readerFactory,
+ BulkWriter.Factory<RowData> writerFactory,
SstPathFactory pathFactory,
long suggestedFileSize) {
this.keyType = keyType;
this.valueType = valueType;
-
- RowType recordType = KeyValue.schema(keyType, valueType);
- this.readerFactory = fileFormat.createReaderFactory(recordType);
- this.writerFactory = fileFormat.createWriterFactory(recordType);
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
}
@@ -88,6 +87,11 @@ public class SstFile {
}
@VisibleForTesting
+ public SstPathFactory pathFactory() {
+ return pathFactory;
+ }
+
+ @VisibleForTesting
public long suggestedFileSize() {
return suggestedFileSize;
}
@@ -277,4 +281,43 @@ public class SstFile {
level);
}
}
+
+ /**
+ * Creator of {@link SstFile}. It reueses {@link BulkFormat} and {@link BulkWriter.Factory} from
+ * {@link FileFormat}.
+ */
+ public static class Factory {
+
+ private final RowType keyType;
+ private final RowType valueType;
+ private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+ private final BulkWriter.Factory<RowData> writerFactory;
+ private final FileStorePathFactory pathFactory;
+ private final long suggestedFileSize;
+
+ public Factory(
+ RowType keyType,
+ RowType valueType,
+ FileFormat fileFormat,
+ FileStorePathFactory pathFactory,
+ long suggestedFileSize) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+ RowType recordType = KeyValue.schema(keyType, valueType);
+ this.readerFactory = fileFormat.createReaderFactory(recordType);
+ this.writerFactory = fileFormat.createWriterFactory(recordType);
+ this.pathFactory = pathFactory;
+ this.suggestedFileSize = suggestedFileSize;
+ }
+
+ public SstFile create(BinaryRowData partition, int bucket) {
+ return new SstFile(
+ keyType,
+ valueType,
+ readerFactory,
+ writerFactory,
+ pathFactory.createSstPathFactory(partition, bucket),
+ suggestedFileSize);
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
index 8634098..ccc8ec6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstPathFactory.java
@@ -21,25 +21,29 @@ package org.apache.flink.table.store.file.mergetree.sst;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
+import javax.annotation.concurrent.ThreadSafe;
+
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
/** Factory which produces new {@link Path}s for sst files. */
+@ThreadSafe
public class SstPathFactory {
private final Path bucketDir;
private final String uuid;
- private int pathCount;
+ private final AtomicInteger pathCount;
public SstPathFactory(Path root, String partition, int bucket) {
this.bucketDir = new Path(root + "/" + partition + "/bucket-" + bucket);
this.uuid = UUID.randomUUID().toString();
- this.pathCount = 0;
+ this.pathCount = new AtomicInteger(0);
}
public Path newPath() {
- return new Path(bucketDir + "/sst-" + uuid + "-" + (pathCount++));
+ return new Path(bucketDir + "/sst-" + uuid + "-" + pathCount.getAndIncrement());
}
public Path toPath(String fileName) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 8cb9fc0..22eeec6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -73,12 +73,11 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final String commitUser;
private final ManifestCommittableSerializer committableSerializer;
-
private final FileStorePathFactory pathFactory;
private final ManifestFile manifestFile;
private final ManifestList manifestList;
- private final FileStoreOptions fileStoreOptions;
private final FileStoreScan scan;
+ private final FileStoreOptions fileStoreOptions;
@Nullable private Lock lock;
@@ -86,18 +85,17 @@ public class FileStoreCommitImpl implements FileStoreCommit {
String commitUser,
ManifestCommittableSerializer committableSerializer,
FileStorePathFactory pathFactory,
- ManifestFile manifestFile,
- ManifestList manifestList,
- FileStoreOptions fileStoreOptions,
- FileStoreScan scan) {
+ ManifestFile.Factory manifestFileFactory,
+ ManifestList.Factory manifestListFactory,
+ FileStoreScan scan,
+ FileStoreOptions fileStoreOptions) {
this.commitUser = commitUser;
this.committableSerializer = committableSerializer;
-
this.pathFactory = pathFactory;
- this.manifestFile = manifestFile;
- this.manifestList = manifestList;
- this.fileStoreOptions = fileStoreOptions;
+ this.manifestFile = manifestFileFactory.create();
+ this.manifestList = manifestListFactory.create();
this.scan = scan;
+ this.fileStoreOptions = fileStoreOptions;
this.lock = null;
}
@@ -327,9 +325,17 @@ public class FileStoreCommitImpl implements FileStoreCommit {
return;
}
+ List<BinaryRowData> changedPartitions =
+ changes.stream()
+ .map(ManifestEntry::partition)
+ .distinct()
+ .collect(Collectors.toList());
try {
- // TODO use partition filter of scan when implemented
- for (ManifestEntry entry : scan.withSnapshot(snapshotId).plan().files()) {
+ for (ManifestEntry entry :
+ scan.withSnapshot(snapshotId)
+ .withPartitionFilter(changedPartitions)
+ .plan()
+ .files()) {
removedFiles.remove(entry.identifier());
}
} catch (Throwable e) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 52edea3..2175f09 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.predicate.Predicate;
@@ -31,6 +32,8 @@ public interface FileStoreScan {
FileStoreScan withPartitionFilter(Predicate predicate);
+ FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
+
FileStoreScan withKeyFilter(Predicate predicate);
FileStoreScan withValueFilter(Predicate predicate);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
index da72dbf..b75dc35 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -18,39 +18,58 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.Or;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
/** Default implementation of {@link FileStoreScan}. */
public class FileStoreScanImpl implements FileStoreScan {
+ private final RowDataToObjectArrayConverter partitionConverter;
private final FileStorePathFactory pathFactory;
- private final ManifestFile manifestFile;
+ private final ManifestFile.Factory manifestFileFactory;
private final ManifestList manifestList;
private Long snapshotId;
private List<ManifestFileMeta> manifests;
+ private Predicate partitionFilter;
+ private Predicate keyFilter;
+ private Predicate valueFilter;
+ private Integer bucket;
public FileStoreScanImpl(
+ RowType partitionType,
FileStorePathFactory pathFactory,
- ManifestFile manifestFile,
- ManifestList manifestList) {
+ ManifestFile.Factory manifestFileFactory,
+ ManifestList.Factory manifestListFactory) {
+ this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
this.pathFactory = pathFactory;
- this.manifestFile = manifestFile;
- this.manifestList = manifestList;
+ this.manifestFileFactory = manifestFileFactory;
+ this.manifestList = manifestListFactory.create();
this.snapshotId = null;
this.manifests = new ArrayList<>();
@@ -58,22 +77,53 @@ public class FileStoreScanImpl implements FileStoreScan {
@Override
public FileStoreScan withPartitionFilter(Predicate predicate) {
- throw new UnsupportedOperationException();
+ this.partitionFilter = predicate;
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withPartitionFilter(List<BinaryRowData> partitions) {
+ Function<BinaryRowData, Predicate> partitionToPredicate =
+ p -> {
+ List<Predicate> fieldPredicates = new ArrayList<>();
+ Object[] partitionObjects = partitionConverter.convert(p);
+ for (int i = 0; i < partitionConverter.getArity(); i++) {
+ Literal l =
+ new Literal(
+ partitionConverter.rowType().getTypeAt(i),
+ partitionObjects[i]);
+ fieldPredicates.add(new Equal(i, l));
+ }
+ return fieldPredicates.stream().reduce(And::new).get();
+ };
+ Optional<Predicate> predicate =
+ partitions.stream()
+ .filter(p -> p.getArity() > 0)
+ .map(partitionToPredicate)
+ .reduce(Or::new);
+ if (predicate.isPresent()) {
+ return withPartitionFilter(predicate.get());
+ } else {
+ return this;
+ }
}
@Override
public FileStoreScan withKeyFilter(Predicate predicate) {
- throw new UnsupportedOperationException();
+ this.keyFilter = predicate;
+ return this;
}
@Override
public FileStoreScan withValueFilter(Predicate predicate) {
- throw new UnsupportedOperationException();
+ this.valueFilter = predicate;
+ return this;
}
@Override
public FileStoreScan withBucket(int bucket) {
- throw new UnsupportedOperationException();
+ this.bucket = bucket;
+ return this;
}
@Override
@@ -109,34 +159,67 @@ public class FileStoreScanImpl implements FileStoreScan {
}
private List<ManifestEntry> scan() {
- Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
- for (ManifestFileMeta manifest : manifests) {
- // TODO read each manifest file concurrently
- for (ManifestEntry entry : manifestFile.read(manifest.fileName())) {
- ManifestEntry.Identifier identifier = entry.identifier();
- switch (entry.kind()) {
- case ADD:
- Preconditions.checkState(
- !map.containsKey(identifier),
- "Trying to add file %s which is already added. "
- + "Manifest might be corrupted.",
- identifier);
- map.put(identifier, entry);
- break;
- case DELETE:
- Preconditions.checkState(
- map.containsKey(identifier),
- "Trying to delete file %s which is not previously added. "
- + "Manifest might be corrupted.",
- identifier);
- map.remove(identifier);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown value kind " + entry.kind().name());
- }
+ List<ManifestEntry> entries;
+ try {
+ entries =
+ FileUtils.COMMON_IO_FORK_JOIN_POOL
+ .submit(
+ () ->
+ manifests
+ .parallelStream()
+ .filter(this::filterManifestFileMeta)
+ .flatMap(m -> readManifestFileMeta(m).stream())
+ .filter(this::filterManifestEntry)
+ .collect(Collectors.toList()))
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Failed to read ManifestEntry list concurrently", e);
+ }
+
+ Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+ for (ManifestEntry entry : entries) {
+ ManifestEntry.Identifier identifier = entry.identifier();
+ switch (entry.kind()) {
+ case ADD:
+ Preconditions.checkState(
+ !map.containsKey(identifier),
+ "Trying to add file %s which is already added. "
+ + "Manifest might be corrupted.",
+ identifier);
+ map.put(identifier, entry);
+ break;
+ case DELETE:
+ Preconditions.checkState(
+ map.containsKey(identifier),
+ "Trying to delete file %s which is not previously added. "
+ + "Manifest might be corrupted.",
+ identifier);
+ map.remove(identifier);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + entry.kind().name());
}
}
return new ArrayList<>(map.values());
}
+
+ private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+ return partitionFilter == null
+ || partitionFilter.test(
+ manifest.numAddedFiles() + manifest.numDeletedFiles(),
+ manifest.partitionStats());
+ }
+
+ private boolean filterManifestEntry(ManifestEntry entry) {
+ // TODO apply key & value filter after field stats are collected in
+ // SstFile.RollingFile#finish
+ return (partitionFilter == null
+ || partitionFilter.test(partitionConverter.convert(entry.partition())))
+ && (bucket == null || entry.bucket() == bucket);
+ }
+
+ private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
+ return manifestFileFactory.create().read(manifest.fileName());
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
new file mode 100644
index 0000000..5734166
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeFactory;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Default implementation of {@link FileStoreWrite}. */
+public class FileStoreWriteImpl implements FileStoreWrite {
+
+ private final FileStorePathFactory pathFactory;
+ private final MergeTreeFactory mergeTreeFactory;
+ private final FileStoreScan scan;
+
+ public FileStoreWriteImpl(
+ FileStorePathFactory pathFactory,
+ MergeTreeFactory mergeTreeFactory,
+ FileStoreScan scan) {
+ this.pathFactory = pathFactory;
+ this.mergeTreeFactory = mergeTreeFactory;
+ this.scan = scan;
+ }
+
+ @Override
+ public RecordWriter createWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+ Long latestSnapshotId = pathFactory.latestSnapshotId();
+ if (latestSnapshotId == null) {
+ return createEmptyWriter(partition, bucket, compactExecutor);
+ } else {
+ MergeTree mergeTree = mergeTreeFactory.create(partition, bucket, compactExecutor);
+ return mergeTree.createWriter(
+ scan.withSnapshot(latestSnapshotId)
+ .withPartitionFilter(Collections.singletonList(partition))
+ .withBucket(bucket).plan().files().stream()
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ @Override
+ public RecordWriter createEmptyWriter(
+ BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+ MergeTree mergeTree = mergeTreeFactory.create(partition, bucket, compactExecutor);
+ return mergeTree.createWriter(Collections.emptyList());
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index b1a2aa2..2ddf607 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -29,18 +30,17 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.IntStream;
/** Serializer for array of {@link FieldStats}. */
public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
private static final long serialVersionUID = 1L;
- private final RowData.FieldGetter[] fieldGetters;
+ private final RowDataToObjectArrayConverter converter;
public FieldStatsArraySerializer(RowType rowType) {
super(schema(rowType));
- this.fieldGetters = createFieldGetters(toAllFieldsNullableRowType(rowType));
+ this.converter = new RowDataToObjectArrayConverter(toAllFieldsNullableRowType(rowType));
}
@Override
@@ -59,18 +59,16 @@ public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
@Override
public FieldStats[] fromRow(RowData row) {
- int rowFieldCount = fieldGetters.length;
+ int rowFieldCount = converter.getArity();
RowData minValues = row.getRow(0, rowFieldCount);
+ Object[] minValueObjects = converter.convert(minValues);
RowData maxValues = row.getRow(1, rowFieldCount);
+ Object[] maxValueObjects = converter.convert(maxValues);
long[] nullValues = row.getArray(2).toLongArray();
FieldStats[] stats = new FieldStats[rowFieldCount];
for (int i = 0; i < rowFieldCount; i++) {
- stats[i] =
- new FieldStats(
- fieldGetters[i].getFieldOrNull(minValues),
- fieldGetters[i].getFieldOrNull(maxValues),
- nullValues[i]);
+ stats[i] = new FieldStats(minValueObjects[i], maxValueObjects[i], nullValues[i]);
}
return stats;
}
@@ -84,12 +82,6 @@ public class FieldStatsArraySerializer extends ObjectSerializer<FieldStats[]> {
return new RowType(fields);
}
- public static RowData.FieldGetter[] createFieldGetters(RowType rowType) {
- return IntStream.range(0, rowType.getFieldCount())
- .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
- .toArray(RowData.FieldGetter[]::new);
- }
-
private static RowType toAllFieldsNullableRowType(RowType rowType) {
// as stated in SstFile.RollingFile#finish, field stats are not collected currently so
// min/max values are all nulls
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
index d7fa281..89ed0f9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsCollector.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.store.file.stats;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
/** Collector to extract statistics of each fields from a series of records. */
public class FieldStatsCollector {
@@ -28,15 +28,14 @@ public class FieldStatsCollector {
private final Object[] minValues;
private final Object[] maxValues;
private final long[] nullCounts;
-
- private final RowData.FieldGetter[] fieldGetters;
+ private final RowDataToObjectArrayConverter converter;
public FieldStatsCollector(RowType rowType) {
int numFields = rowType.getFieldCount();
this.minValues = new Object[numFields];
this.maxValues = new Object[numFields];
this.nullCounts = new long[numFields];
- this.fieldGetters = FieldStatsArraySerializer.createFieldGetters(rowType);
+ this.converter = new RowDataToObjectArrayConverter(rowType);
}
/**
@@ -46,13 +45,9 @@ public class FieldStatsCollector {
* the collector.
*/
public void collect(RowData row) {
- Preconditions.checkArgument(
- fieldGetters.length == row.getArity(),
- "Expecting row data with %d fields but found row data with %d fields",
- fieldGetters.length,
- row.getArity());
+ Object[] objects = converter.convert(row);
for (int i = 0; i < row.getArity(); i++) {
- Object obj = fieldGetters[i].getFieldOrNull(row);
+ Object obj = objects[i];
if (obj == null) {
nullCounts[i]++;
continue;
@@ -73,7 +68,7 @@ public class FieldStatsCollector {
}
public FieldStats[] extract() {
- FieldStats[] stats = new FieldStats[fieldGetters.length];
+ FieldStats[] stats = new FieldStats[nullCounts.length];
for (int i = 0; i < stats.length; i++) {
stats[i] = new FieldStats(minValues[i], maxValues[i], nullCounts[i]);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 9aa9844..679e367 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -37,11 +37,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
/** Factory which produces {@link Path}s for each type of files. */
+@ThreadSafe
public class FileStorePathFactory {
private static final Logger LOG = LoggerFactory.getLogger(FileStorePathFactory.class);
@@ -51,8 +54,8 @@ public class FileStorePathFactory {
private final String uuid;
private final RowDataPartitionComputer partitionComputer;
- private int manifestFileCount;
- private int manifestListCount;
+ private final AtomicInteger manifestFileCount;
+ private final AtomicInteger manifestListCount;
public FileStorePathFactory(Path root) {
this(root, RowType.of(), FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.defaultValue());
@@ -73,16 +76,22 @@ public class FileStorePathFactory {
.toArray(DataType[]::new),
partitionColumns);
- this.manifestFileCount = 0;
- this.manifestListCount = 0;
+ this.manifestFileCount = new AtomicInteger(0);
+ this.manifestListCount = new AtomicInteger(0);
}
public Path newManifestFile() {
- return new Path(root + "/manifest/manifest-" + uuid + "-" + (manifestFileCount++));
+ return new Path(
+ root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement());
}
public Path newManifestList() {
- return new Path(root + "/manifest/manifest-list-" + uuid + "-" + (manifestListCount++));
+ return new Path(
+ root
+ + "/manifest/manifest-list-"
+ + uuid
+ + "-"
+ + manifestListCount.getAndIncrement());
}
public Path toManifestFilePath(String manifestFileName) {
@@ -105,6 +114,7 @@ public class FileStorePathFactory {
return new SstPathFactory(root, getPartitionString(partition), bucket);
}
+ /** IMPORTANT: This method is NOT THREAD SAFE. */
public String getPartitionString(BinaryRowData partition) {
return PartitionPathUtils.generatePartitionPath(
partitionComputer.generatePartValues(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index 33f78b5..50cce1d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -39,6 +39,8 @@ import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
/** Utils for file reading and writing. */
public class FileUtils {
@@ -51,6 +53,22 @@ public class FileUtils {
DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
}
+ public static final ForkJoinPool COMMON_IO_FORK_JOIN_POOL;
+
+ // if we want to name threads in the fork join pool we need all these
+ // see https://stackoverflow.com/questions/34303094/
+ static {
+ ForkJoinPool.ForkJoinWorkerThreadFactory factory =
+ pool -> {
+ ForkJoinWorkerThread worker =
+ ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
+ worker.setName("file-store-common-io-" + worker.getPoolIndex());
+ return worker;
+ };
+ COMMON_IO_FORK_JOIN_POOL =
+ new ForkJoinPool(Runtime.getRuntime().availableProcessors(), factory, null, false);
+ }
+
public static <T> List<T> readListFromFile(
Path path,
ObjectSerializer<T> serializer,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
new file mode 100644
index 0000000..0def1c1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataToObjectArrayConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.IntStream;
+
+/** Convert {@link RowData} to object array. */
+public class RowDataToObjectArrayConverter {
+
+ private final RowType rowType;
+ private final RowData.FieldGetter[] fieldGetters;
+
+ public RowDataToObjectArrayConverter(RowType rowType) {
+ this.rowType = rowType;
+ this.fieldGetters =
+ IntStream.range(0, rowType.getFieldCount())
+ .mapToObj(i -> RowData.createFieldGetter(rowType.getTypeAt(i), i))
+ .toArray(RowData.FieldGetter[]::new);
+ }
+
+ public RowType rowType() {
+ return rowType;
+ }
+
+ public int getArity() {
+ return fieldGetters.length;
+ }
+
+ public Object[] convert(RowData rowData) {
+ Object[] result = new Object[fieldGetters.length];
+ for (int i = 0; i < fieldGetters.length; i++) {
+ result[i] = fieldGetters[i].getFieldOrNull(rowData);
+ }
+ return result;
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 027dda5..8ea0d88 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -135,12 +135,13 @@ public class ManifestFileMetaTest {
}
private ManifestFile createManifestFile(String path) {
- return new ManifestFile(
- PARTITION_TYPE,
- KEY_TYPE,
- ROW_TYPE,
- avro,
- new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"));
+ return new ManifestFile.Factory(
+ PARTITION_TYPE,
+ KEY_TYPE,
+ ROW_TYPE,
+ avro,
+ new FileStorePathFactory(new Path(path), PARTITION_TYPE, "default"))
+ .create();
}
private void createData(List<ManifestFileMeta> input, List<ManifestFileMeta> expected) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index ee09d9a..99a8b40 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -91,12 +91,13 @@ public class ManifestFileTest {
FileStorePathFactory pathFactory =
new FileStorePathFactory(
new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
- return new ManifestFile(
- TestKeyValueGenerator.PARTITION_TYPE,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- avro,
- pathFactory);
+ return new ManifestFile.Factory(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ avro,
+ pathFactory)
+ .create();
}
private void checkMetaIgnoringFileNameAndSize(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index e9df1c1..6b295c7 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -94,6 +94,7 @@ public class ManifestListTest {
FileStorePathFactory pathFactory =
new FileStorePathFactory(
new Path(path), TestKeyValueGenerator.PARTITION_TYPE, "default");
- return new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory);
+ return new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
+ .create();
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 74cabd1..ca85674 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.ValueKind;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
@@ -31,7 +32,7 @@ import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.sst.SstFile;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
import org.apache.flink.table.store.file.mergetree.sst.SstFileTest;
-import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.RecordWriter;
@@ -70,7 +71,7 @@ public class MergeTreeTest {
private static ExecutorService service;
- private SstPathFactory fileFactory;
+ private FileStorePathFactory pathFactory;
private Comparator<RowData> comparator;
@@ -82,12 +83,11 @@ public class MergeTreeTest {
@BeforeEach
public void beforeEach() throws IOException {
- fileFactory = new SstPathFactory(new Path(tempDir.toString()), null, 123);
- Path bucketDir = fileFactory.toPath("ignore").getParent();
- bucketDir.getFileSystem().mkdirs(bucketDir);
-
+ pathFactory = new FileStorePathFactory(new Path(tempDir.toString()));
comparator = Comparator.comparingInt(o -> o.getInt(0));
recreateWriter(1024 * 1024);
+ Path bucketDir = sstFile.pathFactory().toPath("ignore").getParent();
+ bucketDir.getFileSystem().mkdirs(bucketDir);
}
private void recreateWriter(long targetFileSize) {
@@ -97,12 +97,15 @@ public class MergeTreeTest {
configuration.set(MergeTreeOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
MergeTreeOptions options = new MergeTreeOptions(configuration);
sstFile =
- new SstFile(
- new RowType(singletonList(new RowType.RowField("k", new IntType()))),
- new RowType(singletonList(new RowType.RowField("v", new IntType()))),
- new SstFileTest.FlushingAvroFormat(),
- fileFactory,
- options.targetFileSize);
+ new SstFile.Factory(
+ new RowType(
+ singletonList(new RowType.RowField("k", new IntType()))),
+ new RowType(
+ singletonList(new RowType.RowField("v", new IntType()))),
+ new SstFileTest.FlushingAvroFormat(),
+ pathFactory,
+ options.targetFileSize)
+ .create(BinaryRowDataUtil.EMPTY_ROW, 0);
mergeTree =
new MergeTree(options, sstFile, comparator, service, new DeduplicateAccumulator());
writer = mergeTree.createWriter(new ArrayList<>());
@@ -160,7 +163,7 @@ public class MergeTreeTest {
doTestWriteRead(6);
List<SstFileMeta> files = writer.close();
for (SstFileMeta file : files) {
- Path path = fileFactory.toPath(file.fileName());
+ Path path = sstFile.pathFactory().toPath(file.fileName());
assertThat(path.getFileSystem().exists(path)).isFalse();
}
}
@@ -233,7 +236,7 @@ public class MergeTreeTest {
// assert records from increment compacted files
assertRecords(expected, compactedFiles, true);
- Path bucketDir = fileFactory.toPath("ignore").getParent();
+ Path bucketDir = sstFile.pathFactory().toPath("ignore").getParent();
Set<String> files =
Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))
.map(FileStatus::getPath)
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
index b8fecd3..c09545c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -118,19 +118,18 @@ public class SstFileTest {
}
private SstFile createSstFile(String path) {
- FileStorePathFactory fileStorePathFactory = new FileStorePathFactory(new Path(path));
- SstPathFactory sstPathFactory =
- fileStorePathFactory.createSstPathFactory(BinaryRowDataUtil.EMPTY_ROW, 0);
+ FileStorePathFactory pathFactory = new FileStorePathFactory(new Path(path));
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
- return new SstFile(
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- // normal avro format will buffer changes in memory and we can't determine
- // if the written file size is really larger than suggested, so we use a
- // special avro format which flushes for every added element
- flushingAvro,
- sstPathFactory,
- suggestedFileSize);
+ return new SstFile.Factory(
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ // normal avro format will buffer changes in memory and we can't determine
+ // if the written file size is really larger than suggested, so we use a
+ // special avro format which flushes for every added element
+ flushingAvro,
+ pathFactory,
+ suggestedFileSize)
+ .create(BinaryRowDataUtil.EMPTY_ROW, 0);
}
private void checkRollingFiles(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
index 0c8f678..0851ff4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
@@ -25,13 +25,8 @@ import org.apache.flink.table.store.file.FileFormat;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.sst.SstFile;
import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
import org.junit.jupiter.api.BeforeEach;
@@ -71,7 +66,7 @@ public abstract class FileStoreCommitTestBase {
root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
}
- protected abstract String getSchema();
+ protected abstract String getScheme();
@RepeatedTest(10)
public void testSingleCommitUser() throws Exception {
@@ -94,7 +89,7 @@ public abstract class FileStoreCommitTestBase {
.collect(Collectors.toList()),
"input");
Map<BinaryRowData, BinaryRowData> expected =
- toKvMap(
+ OperationTestUtils.toKvMap(
data.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
@@ -114,7 +109,10 @@ public abstract class FileStoreCommitTestBase {
for (int i = 0; i < numThreads; i++) {
TestCommitThread thread =
new TestCommitThread(
- dataPerThread.get(i), createTestPathFactory(), createSafePathFactory());
+ dataPerThread.get(i),
+ OperationTestUtils.createPathFactory(getScheme(), tempDir.toString()),
+ OperationTestUtils.createPathFactory(
+ TestAtomicRenameFileSystem.SCHEME, tempDir.toString()));
thread.start();
threads.add(thread);
}
@@ -123,7 +121,16 @@ public abstract class FileStoreCommitTestBase {
}
// read actual data and compare
- Map<BinaryRowData, BinaryRowData> actual = toKvMap(readKvsFromLatestSnapshot());
+ FileStorePathFactory safePathFactory =
+ OperationTestUtils.createPathFactory(
+ TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+ Long snapshotId = safePathFactory.latestSnapshotId();
+ assertThat(snapshotId).isNotNull();
+ List<KeyValue> actualKvs =
+ OperationTestUtils.readKvsFromSnapshot(snapshotId, avro, safePathFactory);
+ gen.sort(actualKvs);
+ logData(() -> actualKvs, "raw read results");
+ Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
logData(() -> kvMapToKvList(expected), "expected");
logData(() -> kvMapToKvList(actual), "actual");
assertThat(actual).isEqualTo(expected);
@@ -138,90 +145,6 @@ public abstract class FileStoreCommitTestBase {
return data;
}
- private List<KeyValue> readKvsFromLatestSnapshot() throws IOException {
- FileStorePathFactory pathFactory = createSafePathFactory();
- Long latestSnapshotId = pathFactory.latestSnapshotId();
- assertThat(latestSnapshotId).isNotNull();
-
- ManifestFile manifestFile =
- new ManifestFile(
- TestKeyValueGenerator.PARTITION_TYPE,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- avro,
- pathFactory);
- ManifestList manifestList =
- new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory);
-
- List<KeyValue> kvs = new ArrayList<>();
- List<ManifestEntry> entries =
- new FileStoreScanImpl(pathFactory, manifestFile, manifestList)
- .withSnapshot(latestSnapshotId)
- .plan()
- .files();
- for (ManifestEntry entry : entries) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reading actual key-values from file " + entry.file().fileName());
- }
- SstFile sstFile =
- new SstFile(
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- avro,
- pathFactory.createSstPathFactory(entry.partition(), 0),
- 1024 * 1024 // not used
- );
- RecordReaderIterator iterator =
- new RecordReaderIterator(sstFile.read(entry.file().fileName()));
- while (iterator.hasNext()) {
- kvs.add(
- iterator.next()
- .copy(
- TestKeyValueGenerator.KEY_SERIALIZER,
- TestKeyValueGenerator.ROW_SERIALIZER));
- }
- }
-
- gen.sort(kvs);
- logData(() -> kvs, "raw read results");
- return kvs;
- }
-
- private Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
- Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
- for (KeyValue kv : kvs) {
- BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
- BinaryRowData value =
- TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
- switch (kv.valueKind()) {
- case ADD:
- result.put(key, value);
- break;
- case DELETE:
- result.remove(key);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown value kind " + kv.valueKind().name());
- }
- }
- return result;
- }
-
- private FileStorePathFactory createTestPathFactory() {
- return new FileStorePathFactory(
- new Path(getSchema() + "://" + tempDir.toString()),
- TestKeyValueGenerator.PARTITION_TYPE,
- "default");
- }
-
- private FileStorePathFactory createSafePathFactory() {
- return new FileStorePathFactory(
- new Path(TestAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString()),
- TestKeyValueGenerator.PARTITION_TYPE,
- "default");
- }
-
private List<KeyValue> kvMapToKvList(Map<BinaryRowData, BinaryRowData> map) {
return map.entrySet().stream()
.map(e -> new KeyValue().replace(e.getKey(), -1, ValueKind.ADD, e.getValue()))
@@ -244,7 +167,7 @@ public abstract class FileStoreCommitTestBase {
public static class WithTestAtomicRenameFileSystem extends FileStoreCommitTestBase {
@Override
- protected String getSchema() {
+ protected String getScheme() {
return TestAtomicRenameFileSystem.SCHEME;
}
}
@@ -261,7 +184,7 @@ public abstract class FileStoreCommitTestBase {
}
@Override
- protected String getSchema() {
+ protected String getScheme() {
return FailingAtomicRenameFileSystem.SCHEME;
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
new file mode 100644
index 0000000..251c853
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreScanImpl}. */
+public class FileStoreScanTest {
+
+ private static final int NUM_BUCKETS = 10;
+
+ private final FileFormat avro =
+ FileFormat.fromIdentifier(
+ FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration());
+
+ private TestKeyValueGenerator gen;
+ @TempDir java.nio.file.Path tempDir;
+ private FileStorePathFactory pathFactory;
+
+ @BeforeEach
+ public void beforeEach() throws IOException {
+ gen = new TestKeyValueGenerator();
+ pathFactory =
+ OperationTestUtils.createPathFactory(
+ TestAtomicRenameFileSystem.SCHEME, tempDir.toString());
+ Path root = new Path(tempDir.toString());
+ root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+ }
+
+ @RepeatedTest(10)
+ public void testWithPartitionFilter() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+ List<BinaryRowData> partitions =
+ data.stream()
+ .map(kv -> gen.getPartition(kv))
+ .distinct()
+ .collect(Collectors.toList());
+ Snapshot snapshot = writeData(data);
+
+ Set<BinaryRowData> wantedPartitions = new HashSet<>();
+ for (int i = random.nextInt(partitions.size() + 1); i > 0; i--) {
+ wantedPartitions.add(partitions.get(random.nextInt(partitions.size())));
+ }
+
+ FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ scan.withSnapshot(snapshot.id());
+ scan.withPartitionFilter(new ArrayList<>(wantedPartitions));
+
+ Map<BinaryRowData, BinaryRowData> expected =
+ OperationTestUtils.toKvMap(
+ wantedPartitions.isEmpty()
+ ? data
+ : data.stream()
+ .filter(
+ kv ->
+ wantedPartitions.contains(
+ gen.getPartition(kv)))
+ .collect(Collectors.toList()));
+ runTest(scan, snapshot.id(), expected);
+ }
+
+ @RepeatedTest(10)
+ public void testWithBucket() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<KeyValue> data = generateData(random.nextInt(1000) + 1);
+ Snapshot snapshot = writeData(data);
+
+ int wantedBucket = random.nextInt(NUM_BUCKETS);
+
+ FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ scan.withSnapshot(snapshot.id());
+ scan.withBucket(wantedBucket);
+
+ Map<BinaryRowData, BinaryRowData> expected =
+ OperationTestUtils.toKvMap(
+ data.stream()
+ .filter(kv -> getBucket(kv) == wantedBucket)
+ .collect(Collectors.toList()));
+ runTest(scan, snapshot.id(), expected);
+ }
+
+ @RepeatedTest(10)
+ public void testWithSnapshot() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numCommits = random.nextInt(10) + 1;
+ int wantedCommit = random.nextInt(numCommits);
+
+ List<Snapshot> snapshots = new ArrayList<>();
+ List<List<KeyValue>> allData = new ArrayList<>();
+ for (int i = 0; i < numCommits; i++) {
+ List<KeyValue> data = generateData(random.nextInt(100) + 1);
+ snapshots.add(writeData(data));
+ allData.add(data);
+ }
+ long wantedSnapshot = snapshots.get(wantedCommit).id();
+
+ FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ scan.withSnapshot(wantedSnapshot);
+
+ Map<BinaryRowData, BinaryRowData> expected =
+ OperationTestUtils.toKvMap(
+ allData.subList(0, wantedCommit + 1).stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
+ runTest(scan, wantedSnapshot, expected);
+ }
+
+ @RepeatedTest(10)
+ public void testWithManifestList() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numCommits = random.nextInt(10) + 1;
+ for (int i = 0; i < numCommits; i++) {
+ List<KeyValue> data = generateData(random.nextInt(100) + 1);
+ writeData(data);
+ }
+
+ ManifestList manifestList =
+ new ManifestList.Factory(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory)
+ .create();
+ long wantedSnapshot = random.nextLong(pathFactory.latestSnapshotId()) + 1;
+ List<ManifestFileMeta> wantedManifests =
+ manifestList.read(
+ Snapshot.fromPath(pathFactory.toSnapshotPath(wantedSnapshot))
+ .manifestList());
+
+ FileStoreScan scan = OperationTestUtils.createScan(avro, pathFactory);
+ scan.withManifestList(wantedManifests);
+
+ List<KeyValue> expectedKvs =
+ OperationTestUtils.readKvsFromSnapshot(wantedSnapshot, avro, pathFactory);
+ gen.sort(expectedKvs);
+ Map<BinaryRowData, BinaryRowData> expected = OperationTestUtils.toKvMap(expectedKvs);
+ runTest(scan, null, expected);
+ }
+
+ private void runTest(
+ FileStoreScan scan, Long expectedSnapshotId, Map<BinaryRowData, BinaryRowData> expected)
+ throws Exception {
+ FileStoreScan.Plan plan = scan.plan();
+ assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
+
+ List<KeyValue> actualKvs =
+ OperationTestUtils.readKvsFromManifestEntries(plan.files(), avro, pathFactory);
+ gen.sort(actualKvs);
+ Map<BinaryRowData, BinaryRowData> actual = OperationTestUtils.toKvMap(actualKvs);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private List<KeyValue> generateData(int numRecords) {
+ List<KeyValue> data = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ data.add(gen.next());
+ }
+ return data;
+ }
+
+ private Snapshot writeData(List<KeyValue> kvs) throws Exception {
+ FileStoreWrite write = OperationTestUtils.createWrite(avro, pathFactory);
+ Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
+ for (KeyValue kv : kvs) {
+ BinaryRowData partition = gen.getPartition(kv);
+ int bucket = getBucket(kv);
+ writers.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
+ .compute(
+ bucket,
+ (b, w) -> {
+ if (w == null) {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ return write.createWriter(partition, bucket, service);
+ } else {
+ return w;
+ }
+ })
+ .write(kv.valueKind(), kv.key(), kv.value());
+ }
+
+ FileStoreCommit commit = OperationTestUtils.createCommit(avro, pathFactory);
+ ManifestCommittable committable = new ManifestCommittable();
+ for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
+ writers.entrySet()) {
+ for (Map.Entry<Integer, RecordWriter> entryWithBucket :
+ entryWithPartition.getValue().entrySet()) {
+ Increment increment = entryWithBucket.getValue().prepareCommit();
+ committable.add(entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
+ }
+ }
+ commit.commit(committable, Collections.emptyMap());
+ writers.values().stream()
+ .flatMap(m -> m.values().stream())
+ .forEach(
+ w -> {
+ try {
+ w.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ return Snapshot.fromPath(pathFactory.toSnapshotPath(pathFactory.latestSnapshotId()));
+ }
+
+ private int getBucket(KeyValue kv) {
+ return (kv.key().hashCode() % NUM_BUCKETS + NUM_BUCKETS) % NUM_BUCKETS;
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
new file mode 100644
index 0000000..bce303b
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/OperationTestUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.MergeTreeFactory;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Utils for operation tests. */
+public class OperationTestUtils {
+
+ public static MergeTreeOptions getMergeTreeOptions(boolean forceCompact) {
+ Configuration conf = new Configuration();
+ conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
+ conf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
+ conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
+ conf.set(MergeTreeOptions.COMMIT_FORCE_COMPACT, forceCompact);
+ return new MergeTreeOptions(conf);
+ }
+
+ private static FileStoreOptions getFileStoreOptions() {
+ Configuration conf = new Configuration();
+ conf.set(FileStoreOptions.BUCKET, 1);
+ conf.set(
+ FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+ MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
+ return new FileStoreOptions(conf);
+ }
+
+ public static FileStoreScan createScan(
+ FileFormat fileFormat, FileStorePathFactory pathFactory) {
+ return new FileStoreScanImpl(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ pathFactory,
+ createManifestFileFactory(fileFormat, pathFactory),
+ createManifestListFactory(fileFormat, pathFactory));
+ }
+
+ public static FileStoreCommit createCommit(
+ FileFormat fileFormat, FileStorePathFactory pathFactory) {
+ ManifestCommittableSerializer serializer =
+ new ManifestCommittableSerializer(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE);
+ ManifestFile.Factory testManifestFileFactory =
+ createManifestFileFactory(fileFormat, pathFactory);
+ ManifestList.Factory testManifestListFactory =
+ createManifestListFactory(fileFormat, pathFactory);
+ return new FileStoreCommitImpl(
+ UUID.randomUUID().toString(),
+ serializer,
+ pathFactory,
+ testManifestFileFactory,
+ testManifestListFactory,
+ createScan(fileFormat, pathFactory),
+ getFileStoreOptions());
+ }
+
+ public static FileStoreWrite createWrite(
+ FileFormat fileFormat, FileStorePathFactory pathFactory) {
+ MergeTreeFactory mergeTreeFactory =
+ new MergeTreeFactory(
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ TestKeyValueGenerator.KEY_COMPARATOR,
+ new DeduplicateAccumulator(),
+ fileFormat,
+ pathFactory,
+ OperationTestUtils.getMergeTreeOptions(false));
+ return new FileStoreWriteImpl(
+ pathFactory, mergeTreeFactory, createScan(fileFormat, pathFactory));
+ }
+
+ public static FileStorePathFactory createPathFactory(String scheme, String root) {
+ return new FileStorePathFactory(
+ new Path(scheme + "://" + root), TestKeyValueGenerator.PARTITION_TYPE, "default");
+ }
+
+ private static ManifestFile.Factory createManifestFileFactory(
+ FileFormat fileFormat, FileStorePathFactory pathFactory) {
+ return new ManifestFile.Factory(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ fileFormat,
+ pathFactory);
+ }
+
+ private static ManifestList.Factory createManifestListFactory(
+ FileFormat fileFormat, FileStorePathFactory pathFactory) {
+ return new ManifestList.Factory(
+ TestKeyValueGenerator.PARTITION_TYPE, fileFormat, pathFactory);
+ }
+
+ public static List<KeyValue> readKvsFromSnapshot(
+ long snapshotId, FileFormat fileFormat, FileStorePathFactory pathFactory)
+ throws IOException {
+ List<ManifestEntry> entries =
+ createScan(fileFormat, pathFactory).withSnapshot(snapshotId).plan().files();
+ return readKvsFromManifestEntries(entries, fileFormat, pathFactory);
+ }
+
+ public static List<KeyValue> readKvsFromManifestEntries(
+ List<ManifestEntry> entries, FileFormat fileFormat, FileStorePathFactory pathFactory)
+ throws IOException {
+ List<KeyValue> kvs = new ArrayList<>();
+ SstFile.Factory sstFileFactory =
+ new SstFile.Factory(
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ fileFormat,
+ pathFactory,
+ 1024 * 1024 // not used
+ );
+ for (ManifestEntry entry : entries) {
+ SstFile sstFile = sstFileFactory.create(entry.partition(), entry.bucket());
+ RecordReaderIterator iterator =
+ new RecordReaderIterator(sstFile.read(entry.file().fileName()));
+ while (iterator.hasNext()) {
+ kvs.add(
+ iterator.next()
+ .copy(
+ TestKeyValueGenerator.KEY_SERIALIZER,
+ TestKeyValueGenerator.ROW_SERIALIZER));
+ }
+ }
+ return kvs;
+ }
+
+ public static Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
+ Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
+ for (KeyValue kv : kvs) {
+ BinaryRowData key = TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
+ BinaryRowData value =
+ TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
+ switch (kv.valueKind()) {
+ case ADD:
+ result.put(key, value);
+ break;
+ case DELETE:
+ result.remove(key);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + kv.valueKind().name());
+ }
+ }
+ return result;
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 334acfd..5d96cf5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -19,22 +19,11 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileFormat;
-import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.TestKeyValueGenerator;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.MergeTree;
-import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
-import org.apache.flink.table.store.file.mergetree.sst.SstFile;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.slf4j.Logger;
@@ -45,37 +34,19 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
/** Testing {@link Thread}s to perform concurrent commits. */
public class TestCommitThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(TestCommitThread.class);
- private static final MergeTreeOptions MERGE_TREE_OPTIONS;
- private static final long SUGGESTED_SST_FILE_SIZE = 1024;
-
- static {
- Configuration mergeTreeConf = new Configuration();
- mergeTreeConf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
- mergeTreeConf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
- MERGE_TREE_OPTIONS = new MergeTreeOptions(mergeTreeConf);
- }
-
- private final FileFormat avro =
- FileFormat.fromIdentifier(
- FileStoreCommitTestBase.class.getClassLoader(), "avro", new Configuration());
-
private final Map<BinaryRowData, List<KeyValue>> data;
- private final FileStorePathFactory safePathFactory;
-
private final Map<BinaryRowData, MergeTreeWriter> writers;
- private final FileStoreScan scan;
+ private final FileStoreWrite write;
private final FileStoreCommit commit;
public TestCommitThread(
@@ -83,53 +54,15 @@ public class TestCommitThread extends Thread {
FileStorePathFactory testPathFactory,
FileStorePathFactory safePathFactory) {
this.data = data;
- this.safePathFactory = safePathFactory;
-
this.writers = new HashMap<>();
- this.scan =
- new FileStoreScanImpl(
- safePathFactory,
- createManifestFile(safePathFactory),
- createManifestList(safePathFactory));
-
- ManifestCommittableSerializer serializer =
- new ManifestCommittableSerializer(
- TestKeyValueGenerator.PARTITION_TYPE,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE);
- ManifestFile testManifestFile = createManifestFile(testPathFactory);
- ManifestList testManifestList = createManifestList(testPathFactory);
- Configuration fileStoreConf = new Configuration();
- fileStoreConf.set(FileStoreOptions.BUCKET, 1);
- fileStoreConf.set(
- FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
- MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1) + "kb"));
- FileStoreOptions fileStoreOptions = new FileStoreOptions(fileStoreConf);
- FileStoreScanImpl testScan =
- new FileStoreScanImpl(testPathFactory, testManifestFile, testManifestList);
- this.commit =
- new FileStoreCommitImpl(
- UUID.randomUUID().toString(),
- serializer,
- testPathFactory,
- testManifestFile,
- testManifestList,
- fileStoreOptions,
- testScan);
- }
-
- private ManifestFile createManifestFile(FileStorePathFactory pathFactory) {
- return new ManifestFile(
- TestKeyValueGenerator.PARTITION_TYPE,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- avro,
- pathFactory);
- }
-
- private ManifestList createManifestList(FileStorePathFactory pathFactory) {
- return new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro, pathFactory);
+ FileFormat avro =
+ FileFormat.fromIdentifier(
+ FileStoreCommitTestBase.class.getClassLoader(),
+ "avro",
+ new Configuration());
+ this.write = OperationTestUtils.createWrite(avro, safePathFactory);
+ this.commit = OperationTestUtils.createCommit(avro, testPathFactory);
}
@Override
@@ -155,11 +88,7 @@ public class TestCommitThread extends Thread {
break;
} catch (Throwable e) {
if (LOG.isDebugEnabled()) {
- LOG.warn(
- "["
- + Thread.currentThread().getName()
- + "] Failed to commit because of exception, try again",
- e);
+ LOG.warn("Failed to commit because of exception, try again", e);
}
writers.clear();
shouldCheckFilter = true;
@@ -214,13 +143,6 @@ public class TestCommitThread extends Thread {
}
private MergeTreeWriter createWriter(BinaryRowData partition) {
- SstFile sstFile =
- new SstFile(
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.ROW_TYPE,
- avro,
- safePathFactory.createSstPathFactory(partition, 0),
- SUGGESTED_SST_FILE_SIZE);
ExecutorService service =
Executors.newSingleThreadExecutor(
r -> {
@@ -228,23 +150,6 @@ public class TestCommitThread extends Thread {
t.setName(Thread.currentThread().getName() + "-writer-service-pool");
return t;
});
- MergeTree mergeTree =
- new MergeTree(
- MERGE_TREE_OPTIONS,
- sstFile,
- TestKeyValueGenerator.KEY_COMPARATOR,
- service,
- new DeduplicateAccumulator());
- Long latestSnapshotId = safePathFactory.latestSnapshotId();
- if (latestSnapshotId == null) {
- return (MergeTreeWriter) mergeTree.createWriter(Collections.emptyList());
- } else {
- return (MergeTreeWriter)
- mergeTree.createWriter(
- scan.withSnapshot(latestSnapshotId).plan().files().stream()
- .filter(e -> partition.equals(e.partition()))
- .map(ManifestEntry::file)
- .collect(Collectors.toList()));
- }
+ return (MergeTreeWriter) write.createWriter(partition, 0, service);
}
}