You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/13 08:55:22 UTC
[flink-table-store] 06/06: [FLINK-25628] Introduce SstFile and SstFileMeta
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 7ac835ffe828a511689ac080d90d2a729aa08d89
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Jan 12 19:56:26 2022 +0800
[FLINK-25628] Introduce SstFile and SstFileMeta
This closes #4
Co-authored-by: tsreaper <ts...@gmail.com>
---
.../table/store/file/mergetree/sst/SstFile.java | 269 +++++++++++++++++++++
.../store/file/mergetree/sst/SstFileMeta.java | 186 ++++++++++++++
.../file/mergetree/sst/SstFileMetaSerializer.java | 69 ++++++
.../flink/table/store/file/utils/FileUtils.java | 76 ++++++
.../mergetree/sst/SstFileMetaSerializerTest.java | 39 +++
.../store/file/mergetree/sst/SstFileTest.java | 245 +++++++++++++++++++
.../file/mergetree/sst/SstTestDataGenerator.java | 178 ++++++++++++++
.../file/utils/FailingAtomicRenameFileSystem.java | 143 +++++++++++
.../store/file/utils/FileStorePathFactory.java | 112 +++++++++
.../file/utils/TestAtomicRenameFileSystem.java | 121 +++++++++
.../org.apache.flink.core.fs.FileSystemFactory | 17 ++
11 files changed, 1455 insertions(+)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
new file mode 100644
index 0000000..d73ff45
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueSerializer;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This file includes several {@link KeyValue}s, representing the changes inserted into the file
+ * storage.
+ */
+public class SstFile {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SstFile.class);
+
+ private final RowType keyType;
+ private final RowType valueType;
+
+ private final BulkFormat<RowData, FileSourceSplit> readerFactory;
+ private final BulkWriter.Factory<RowData> writerFactory;
+ private final SstPathFactory pathFactory;
+ private final long suggestedFileSize;
+
+ public SstFile(
+ RowType keyType,
+ RowType valueType,
+ FileFormat fileFormat,
+ SstPathFactory pathFactory,
+ long suggestedFileSize) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+
+ RowType recordType = KeyValue.schema(keyType, valueType);
+ this.readerFactory = fileFormat.createReaderFactory(recordType);
+ this.writerFactory = fileFormat.createWriterFactory(recordType);
+ this.pathFactory = pathFactory;
+ this.suggestedFileSize = suggestedFileSize;
+ }
+
+ public RowType keyType() {
+ return keyType;
+ }
+
+ public RowType valueType() {
+ return valueType;
+ }
+
+ @VisibleForTesting
+ public long suggestedFileSize() {
+ return suggestedFileSize;
+ }
+
+ public RecordReader read(String fileName) throws IOException {
+ return new SstFileRecordReader(pathFactory.toPath(fileName));
+ }
+
+ /**
+ * Write several {@link KeyValue}s into an sst file of a given level.
+ *
+ * <p>NOTE: This method is atomic.
+ */
+ public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int level)
+ throws Exception {
+ List<SstFileMeta> result = new ArrayList<>();
+
+ RollingFile rollingFile = null;
+ Path currentPath = null;
+ try {
+ while (iterator.hasNext()) {
+ if (rollingFile == null) {
+ currentPath = pathFactory.newPath();
+ rollingFile = new RollingFile(currentPath, suggestedFileSize);
+ }
+ rollingFile.write(iterator.next());
+ if (rollingFile.exceedsSuggestedFileSize()) {
+ result.add(rollingFile.finish(level));
+ rollingFile = null;
+ }
+ }
+ // finish last file
+ if (rollingFile != null) {
+ result.add(rollingFile.finish(level));
+ }
+ iterator.close();
+ } catch (Throwable e) {
+ LOG.warn("Exception occurs when writing sst files. Cleaning up.", e);
+ // clean up finished files
+ for (SstFileMeta meta : result) {
+ FileUtils.deleteOrWarn(pathFactory.toPath(meta.fileName()));
+ }
+ // clean up in-progress file
+ if (currentPath != null) {
+ FileUtils.deleteOrWarn(currentPath);
+ }
+ throw e;
+ }
+
+ return result;
+ }
+
+ public void delete(SstFileMeta file) {
+ FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName()));
+ }
+
+ private class SstFileRecordReader implements RecordReader {
+
+ private final BulkFormat.Reader<RowData> reader;
+ private final KeyValueSerializer serializer;
+
+ private SstFileRecordReader(Path path) throws IOException {
+ long fileSize = FileUtils.getFileSize(path);
+ FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+ this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+ this.serializer = new KeyValueSerializer(keyType, valueType);
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator readBatch() throws IOException {
+ BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
+ return iterator == null ? null : new SstFileRecordIterator(iterator, serializer);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+
+ private static class SstFileRecordIterator implements RecordReader.RecordIterator {
+
+ private final BulkFormat.RecordIterator<RowData> iterator;
+ private final KeyValueSerializer serializer;
+
+ private SstFileRecordIterator(
+ BulkFormat.RecordIterator<RowData> iterator, KeyValueSerializer serializer) {
+ this.iterator = iterator;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public KeyValue next() throws IOException {
+ RecordAndPosition<RowData> result = iterator.next();
+ return result == null ? null : serializer.fromRow(result.getRecord());
+ }
+
+ @Override
+ public void releaseBatch() {
+ iterator.releaseBatch();
+ }
+ }
+
+ private class RollingFile {
+ private final Path path;
+ private final long suggestedFileSize;
+
+ private final FSDataOutputStream out;
+ private final BulkWriter<RowData> writer;
+ private final KeyValueSerializer serializer;
+ private final RowDataSerializer keySerializer;
+
+ private long rowCount;
+ private BinaryRowData minKey;
+ private RowData maxKey;
+ private long minSequenceNumber;
+ private long maxSequenceNumber;
+
+ private RollingFile(Path path, long suggestedFileSize) throws IOException {
+ this.path = path;
+ this.suggestedFileSize = suggestedFileSize;
+
+ this.out =
+ this.path.getFileSystem().create(this.path, FileSystem.WriteMode.NO_OVERWRITE);
+ this.writer = writerFactory.create(out);
+ this.serializer = new KeyValueSerializer(keyType, valueType);
+ this.keySerializer = new RowDataSerializer(keyType);
+
+ this.rowCount = 0;
+ this.minKey = null;
+ this.maxKey = null;
+ this.minSequenceNumber = Long.MAX_VALUE;
+ this.maxSequenceNumber = Long.MIN_VALUE;
+ }
+
+ private void write(KeyValue kv) throws IOException {
+ writer.addElement(serializer.toRow(kv));
+
+ rowCount++;
+ if (minKey == null) {
+ minKey = keySerializer.toBinaryRow(kv.key()).copy();
+ }
+ maxKey = kv.key();
+ minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
+ maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+ }
+
+ private boolean exceedsSuggestedFileSize() throws IOException {
+ // NOTE: this method is inaccurate for formats buffering changes in memory
+ return out.getPos() >= suggestedFileSize;
+ }
+
+ private SstFileMeta finish(int level) throws IOException {
+ writer.finish();
+ out.close();
+
+ // TODO
+ // 1. Read statistics directly from the written orc/parquet files.
+ // 2. For other file formats use StatsCollector. Make sure fields are not reused
+ // otherwise we need copying.
+ FieldStats[] stats = new FieldStats[valueType.getFieldCount()];
+ for (int i = 0; i < stats.length; i++) {
+ stats[i] = new FieldStats(null, null, 0);
+ }
+
+ return new SstFileMeta(
+ path.getName(),
+ FileUtils.getFileSize(path),
+ rowCount,
+ minKey,
+ keySerializer.toBinaryRow(maxKey).copy(),
+ stats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ level);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
new file mode 100644
index 0000000..5a7e99a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Metadata of a SST file. */
+public class SstFileMeta {
+
+ private final String fileName;
+ private final long fileSize;
+ private final long rowCount;
+
+ private final BinaryRowData minKey;
+ private final BinaryRowData maxKey;
+ private final FieldStats[] stats;
+
+ private final long minSequenceNumber;
+ private final long maxSequenceNumber;
+ private final int level;
+
+ public SstFileMeta(
+ String fileName,
+ long fileSize,
+ long rowCount,
+ BinaryRowData minKey,
+ BinaryRowData maxKey,
+ FieldStats[] stats,
+ long minSequenceNumber,
+ long maxSequenceNumber,
+ int level) {
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ this.rowCount = rowCount;
+
+ this.minKey = minKey;
+ this.maxKey = maxKey;
+ this.stats = stats;
+
+ this.minSequenceNumber = minSequenceNumber;
+ this.maxSequenceNumber = maxSequenceNumber;
+ this.level = level;
+ }
+
+ public String fileName() {
+ return fileName;
+ }
+
+ public long fileSize() {
+ return fileSize;
+ }
+
+ public long rowCount() {
+ return rowCount;
+ }
+
+ public BinaryRowData minKey() {
+ return minKey;
+ }
+
+ public BinaryRowData maxKey() {
+ return maxKey;
+ }
+
+ /** Element in the array may be null, indicating the statistics of this field is unknown. */
+ public FieldStats[] stats() {
+ return stats;
+ }
+
+ public long minSequenceNumber() {
+ return minSequenceNumber;
+ }
+
+ public long maxSequenceNumber() {
+ return maxSequenceNumber;
+ }
+
+ public int level() {
+ return level;
+ }
+
+ public SstFileMeta upgrade(int newLevel) {
+ checkArgument(newLevel > this.level);
+ return new SstFileMeta(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ stats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ newLevel);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof SstFileMeta)) {
+ return false;
+ }
+ SstFileMeta that = (SstFileMeta) o;
+ return Objects.equals(fileName, that.fileName)
+ && fileSize == that.fileSize
+ && rowCount == that.rowCount
+ && Objects.equals(minKey, that.minKey)
+ && Objects.equals(maxKey, that.maxKey)
+ && Arrays.equals(stats, that.stats)
+ && minSequenceNumber == that.minSequenceNumber
+ && maxSequenceNumber == that.maxSequenceNumber
+ && level == that.level;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ // by default, hash code of arrays are computed by reference, not by content.
+ // so we must use Arrays.hashCode to hash by content.
+ Arrays.hashCode(stats),
+ minSequenceNumber,
+ maxSequenceNumber,
+ level);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{%s, %d, %d, %s, %s, %s, %d, %d, %d}",
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ Arrays.toString(stats),
+ minSequenceNumber,
+ maxSequenceNumber,
+ level);
+ }
+
+ public static RowType schema(RowType keyType, RowType rowType) {
+ List<RowType.RowField> fields = new ArrayList<>();
+ fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)));
+ fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false)));
+ fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
+ fields.add(new RowType.RowField("_MIN_KEY", keyType));
+ fields.add(new RowType.RowField("_MAX_KEY", keyType));
+ fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(rowType)));
+ fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new BigIntType(false)));
+ fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
+ fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
+ return new RowType(fields);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
new file mode 100644
index 0000000..0e176d1
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.ObjectSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+/** Serializer for {@link SstFileMeta}. */
+public class SstFileMetaSerializer extends ObjectSerializer<SstFileMeta> {
+
+ private final RowDataSerializer keySerializer;
+ private final FieldStatsArraySerializer statsArraySerializer;
+
+ public SstFileMetaSerializer(RowType keyType, RowType rowType) {
+ super(SstFileMeta.schema(keyType, rowType));
+ this.keySerializer = new RowDataSerializer(keyType);
+ this.statsArraySerializer = new FieldStatsArraySerializer(rowType);
+ }
+
+ @Override
+ public RowData toRow(SstFileMeta meta) {
+ return GenericRowData.of(
+ StringData.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.rowCount(),
+ meta.minKey(),
+ meta.maxKey(),
+ statsArraySerializer.toRow(meta.stats()),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.level());
+ }
+
+ @Override
+ public SstFileMeta fromRow(RowData row) {
+ int keyFieldCount = keySerializer.getArity();
+ return new SstFileMeta(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getLong(2),
+ keySerializer.toBinaryRow(row.getRow(3, keyFieldCount)).copy(),
+ keySerializer.toBinaryRow(row.getRow(4, keyFieldCount)).copy(),
+ statsArraySerializer.fromRow(row.getRow(5, statsArraySerializer.numFields())),
+ row.getLong(6),
+ row.getLong(7),
+ row.getInt(8));
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
new file mode 100644
index 0000000..b787b9a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.Utils;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Utils for file reading and writing. */
+public class FileUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+ public static final Configuration DEFAULT_READER_CONFIG = new Configuration();
+
+ static {
+ DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
+ }
+
+ public static <T> List<T> readListFromFile(
+ Path path,
+ ObjectSerializer<T> serializer,
+ BulkFormat<RowData, FileSourceSplit> readerFactory)
+ throws IOException {
+ List<T> result = new ArrayList<>();
+ long fileSize = FileUtils.getFileSize(path);
+ FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+ BulkFormat.Reader<RowData> reader =
+ readerFactory.createReader(DEFAULT_READER_CONFIG, split);
+ Utils.forEachRemaining(reader, row -> result.add(serializer.fromRow(row)));
+ return result;
+ }
+
+ public static long getFileSize(Path path) throws IOException {
+ return path.getFileSystem().getFileStatus(path).getLen();
+ }
+
+ public static void deleteOrWarn(Path file) {
+ try {
+ FileSystem fs = file.getFileSystem();
+ if (!fs.delete(file, false) && fs.exists(file)) {
+ LOG.warn("Failed to delete file " + file);
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception occurs when deleting file " + file, e);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java
new file mode 100644
index 0000000..28e03b6
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase;
+
+/** Tests for {@link SstFileMetaSerializer}. */
+public class SstFileMetaSerializerTest extends ObjectSerializerTestBase<SstFileMeta> {
+
+ private final SstTestDataGenerator gen = SstTestDataGenerator.builder().build();
+
+ @Override
+ protected SstFileMetaSerializer serializer() {
+ return new SstFileMetaSerializer(
+ TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+ }
+
+ @Override
+ protected SstFileMeta object() {
+ return gen.next().meta;
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
new file mode 100644
index 0000000..de6ed25
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueSerializerTest;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SstFile}. */
+public class SstFileTest {
+
+ private final SstTestDataGenerator gen =
+ SstTestDataGenerator.builder().memTableCapacity(20).build();
+ private final FileFormat flushingAvro = new FlushingAvroFormat();
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @RepeatedTest(10)
+ public void testWriteAndReadSstFile() throws Exception {
+ SstTestDataGenerator.SstFile data = gen.next();
+ SstFile sstFile = createSstFile(tempDir.toString());
+ SstFileMetaSerializer serializer =
+ new SstFileMetaSerializer(
+ TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE);
+
+ List<SstFileMeta> actualMetas =
+ sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+
+ checkRollingFiles(data.meta, actualMetas, sstFile.suggestedFileSize());
+
+ Iterator<KeyValue> expectedIterator = data.content.iterator();
+ for (SstFileMeta meta : actualMetas) {
+ // check the contents of sst file
+ CloseableIterator<KeyValue> actualKvsIterator =
+ new RecordReaderIterator(sstFile.read(meta.fileName()));
+ while (actualKvsIterator.hasNext()) {
+ assertThat(expectedIterator.hasNext()).isTrue();
+ KeyValue actualKv = actualKvsIterator.next();
+ assertThat(
+ KeyValueSerializerTest.equals(
+ expectedIterator.next(),
+ actualKv,
+ TestKeyValueGenerator.KEY_SERIALIZER,
+ TestKeyValueGenerator.ROW_SERIALIZER))
+ .isTrue();
+ }
+ actualKvsIterator.close();
+
+ // check that each sst file meta is serializable
+ assertThat(serializer.fromRow(serializer.toRow(meta))).isEqualTo(meta);
+ }
+ assertThat(expectedIterator.hasNext()).isFalse();
+ }
+
+ @RepeatedTest(10)
+ public void testCleanUpForException() throws IOException {
+ FailingAtomicRenameFileSystem.resetFailCounter(1);
+ FailingAtomicRenameFileSystem.setFailPossibility(10);
+ SstTestDataGenerator.SstFile data = gen.next();
+ SstFile sstFile =
+ createSstFile(FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
+
+ try {
+ sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0);
+ } catch (Throwable e) {
+ assertThat(e)
+ .isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+ Path root = new Path(tempDir.toString());
+ FileSystem fs = root.getFileSystem();
+ for (FileStatus bucketStatus : fs.listStatus(root)) {
+ assertThat(bucketStatus.isDir()).isTrue();
+ assertThat(fs.listStatus(bucketStatus.getPath())).isEmpty();
+ }
+ }
+ }
+
+ private SstFile createSstFile(String path) {
+ FileStorePathFactory fileStorePathFactory = new FileStorePathFactory(new Path(path));
+ SstPathFactory sstPathFactory = fileStorePathFactory.createSstPathFactory(null, 0);
+ int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
+ return new SstFile(
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ // normal avro format will buffer changes in memory and we can't determine
+ // if the written file size is really larger than suggested, so we use a
+ // special avro format which flushes for every added element
+ flushingAvro,
+ sstPathFactory,
+ suggestedFileSize);
+ }
+
+ private void checkRollingFiles(
+ SstFileMeta expected, List<SstFileMeta> actual, long suggestedFileSize) {
+ // all but last file should be no smaller than suggestedFileSize
+ for (int i = 0; i + 1 < actual.size(); i++) {
+ assertThat(actual.get(i).fileSize() >= suggestedFileSize).isTrue();
+ }
+
+ // expected.rowCount == sum(rowCount)
+ assertThat(actual.stream().mapToLong(SstFileMeta::rowCount).sum())
+ .isEqualTo(expected.rowCount());
+
+ // expected.minKey == firstFile.minKey
+ assertThat(actual.get(0).minKey()).isEqualTo(expected.minKey());
+
+ // expected.maxKey == lastFile.maxKey
+ assertThat(actual.get(actual.size() - 1).maxKey()).isEqualTo(expected.maxKey());
+
+ // TODO check stats after they're collected
+ /*
+ for (int i = 0; i < expected.stats().length; i++) {
+ List<FieldStats> actualStats = new ArrayList<>();
+ for (SstFileMeta meta : actual) {
+ actualStats.add(meta.stats()[i]);
+ }
+ checkRollingFileStats(expected.stats()[i], actualStats);
+ }
+ */
+
+ // expected.minSequenceNumber == min(minSequenceNumber)
+ assertThat(actual.stream().mapToLong(SstFileMeta::minSequenceNumber).min().orElse(-1))
+ .isEqualTo(expected.minSequenceNumber());
+
+ // expected.maxSequenceNumber == max(maxSequenceNumber)
+ assertThat(actual.stream().mapToLong(SstFileMeta::maxSequenceNumber).max().orElse(-1))
+ .isEqualTo(expected.maxSequenceNumber());
+
+ // expected.level == eachFile.level
+ for (SstFileMeta meta : actual) {
+ assertThat(meta.level()).isEqualTo(expected.level());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) {
+ if (expected.minValue() instanceof Comparable) {
+ Object actualMin = null;
+ Object actualMax = null;
+ for (FieldStats stats : actual) {
+ if (stats.minValue() != null
+ && (actualMin == null
+ || ((Comparable<Object>) stats.minValue()).compareTo(actualMin)
+ < 0)) {
+ actualMin = stats.minValue();
+ }
+ if (stats.maxValue() != null
+ && (actualMax == null
+ || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax)
+ > 0)) {
+ actualMax = stats.maxValue();
+ }
+ }
+ assertThat(actualMin).isEqualTo(expected.minValue());
+ assertThat(actualMax).isEqualTo(expected.maxValue());
+ } else {
+ for (FieldStats stats : actual) {
+ assertThat(stats.minValue()).isNull();
+ assertThat(stats.maxValue()).isNull();
+ }
+ }
+ assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum())
+ .isEqualTo(expected.nullCount());
+ }
+
+ /** A special avro {@link FileFormat} which flushes for every added element. */
+ public static class FlushingAvroFormat implements FileFormat {
+
+ private final FileFormat avro =
+ FileFormat.fromIdentifier(
+ SstFileTest.class.getClassLoader(), "avro", new Configuration());
+
+ @Override
+ public BulkFormat<RowData, FileSourceSplit> createReaderFactory(
+ RowType type, List<ResolvedExpression> filters) {
+ return avro.createReaderFactory(type, filters);
+ }
+
+ @Override
+ public BulkWriter.Factory<RowData> createWriterFactory(RowType type) {
+ return fsDataOutputStream -> {
+ BulkWriter<RowData> wrapped =
+ avro.createWriterFactory(type).create(fsDataOutputStream);
+ return new BulkWriter<RowData>() {
+ @Override
+ public void addElement(RowData rowData) throws IOException {
+ wrapped.addElement(rowData);
+ wrapped.flush();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ wrapped.flush();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ wrapped.finish();
+ }
+ };
+ };
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
new file mode 100644
index 0000000..94b2f08
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.sst;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** Random {@link SstFileMeta} generator. */
+public class SstTestDataGenerator {
+
+ private final int numBuckets;
+ private final int memTableCapacity;
+
+ private final List<Map<BinaryRowData, List<KeyValue>>> memTables;
+ private final TestKeyValueGenerator gen;
+
+ private SstTestDataGenerator(int numBuckets, int memTableCapacity) {
+ this.numBuckets = numBuckets;
+ this.memTableCapacity = memTableCapacity;
+
+ this.memTables = new ArrayList<>();
+ for (int i = 0; i < numBuckets; i++) {
+ memTables.add(new HashMap<>());
+ }
+ this.gen = new TestKeyValueGenerator();
+ }
+
+ public SstFile next() {
+ while (true) {
+ KeyValue kv = gen.next();
+ BinaryRowData key = (BinaryRowData) kv.key();
+ BinaryRowData partition = gen.getPartition(kv);
+ int bucket = (key.hashCode() % numBuckets + numBuckets) % numBuckets;
+ List<KeyValue> memTable =
+ memTables.get(bucket).computeIfAbsent(partition, k -> new ArrayList<>());
+ memTable.add(kv);
+
+ if (memTable.size() >= memTableCapacity) {
+ List<SstFile> result = createSstFiles(memTable, 0, partition, bucket);
+ memTable.clear();
+ assert result.size() == 1;
+ return result.get(0);
+ }
+ }
+ }
+
+ public List<SstFile> createSstFiles(
+ List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {
+ gen.sort(kvs);
+ List<KeyValue> combined = new ArrayList<>();
+ for (int i = 0; i + 1 < kvs.size(); i++) {
+ KeyValue now = kvs.get(i);
+ KeyValue next = kvs.get(i + 1);
+ if (!now.key().equals(next.key())) {
+ combined.add(now);
+ }
+ }
+ combined.add(kvs.get(kvs.size() - 1));
+
+ int capacity = memTableCapacity;
+ for (int i = 0; i < level; i++) {
+ capacity *= memTableCapacity;
+ }
+ List<SstFile> result = new ArrayList<>();
+ for (int i = 0; i < combined.size(); i += capacity) {
+ result.add(
+ createSstFile(
+ combined.subList(i, Math.min(i + capacity, combined.size())),
+ level,
+ partition,
+ bucket));
+ }
+ return result;
+ }
+
+ private SstFile createSstFile(
+ List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) {
+ FieldStatsCollector collector = new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE);
+ long totalSize = 0;
+ BinaryRowData minKey = null;
+ BinaryRowData maxKey = null;
+ long minSequenceNumber = Long.MAX_VALUE;
+ long maxSequenceNumber = Long.MIN_VALUE;
+ for (KeyValue kv : kvs) {
+ BinaryRowData key = (BinaryRowData) kv.key();
+ BinaryRowData value = (BinaryRowData) kv.value();
+ totalSize += key.getSizeInBytes() + value.getSizeInBytes();
+ collector.collect(value);
+ if (minKey == null || gen.compareKeys(key, minKey) < 0) {
+ minKey = key;
+ }
+ if (maxKey == null || gen.compareKeys(key, maxKey) > 0) {
+ maxKey = key;
+ }
+ minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber());
+ maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber());
+ }
+
+ return new SstFile(
+ partition,
+ bucket,
+ new SstFileMeta(
+ "sst-" + UUID.randomUUID(),
+ totalSize,
+ kvs.size(),
+ minKey,
+ maxKey,
+ collector.extract(),
+ minSequenceNumber,
+ maxSequenceNumber,
+ level),
+ kvs);
+ }
+
+ /** An in-memory SST file. */
+ public static class SstFile {
+ public final BinaryRowData partition;
+ public final int bucket;
+ public final SstFileMeta meta;
+ public final List<KeyValue> content;
+
+ private SstFile(
+ BinaryRowData partition, int bucket, SstFileMeta meta, List<KeyValue> content) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.meta = meta;
+ this.content = content;
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder for {@link SstTestDataGenerator}. */
+ public static class Builder {
+ private int numBuckets = 3;
+ private int memTableCapacity = 3;
+
+ public Builder numBuckets(int value) {
+ this.numBuckets = value;
+ return this;
+ }
+
+ public Builder memTableCapacity(int value) {
+ this.memTableCapacity = value;
+ return this;
+ }
+
+ public SstTestDataGenerator build() {
+ return new SstTestDataGenerator(numBuckets, memTableCapacity);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
new file mode 100644
index 0000000..eb3b32a
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStreamWrapper;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FSDataOutputStreamWrapper;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link TestAtomicRenameFileSystem} which may fail when reading and writing. Mainly used to
+ * check if components deal with failures correctly.
+ */
+public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem {
+
+ public static final String SCHEME = "fail";
+
+ private static final AtomicInteger failCounter = new AtomicInteger();
+ private static int failPossibility = 1000;
+
+ public static void resetFailCounter(int maxValue) {
+ failCounter.set(maxValue);
+ }
+
+ public static void setFailPossibility(int v) {
+ failPossibility = v;
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return new FailingFSDataInputStreamWrapper(super.open(f, bufferSize));
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ return new FailingFSDataInputStreamWrapper(super.open(f));
+ }
+
+ @Override
+ public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite)
+ throws IOException {
+ return new FailingFSDataOutputStreamWrapper(super.create(filePath, overwrite));
+ }
+
+ @Override
+ public URI getUri() {
+ return URI.create(SCHEME + ":///");
+ }
+
+ /** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */
+ public static final class FailingAtomicRenameFileSystemFactory implements FileSystemFactory {
+
+ @Override
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ @Override
+ public FileSystem create(URI uri) throws IOException {
+ return new FailingAtomicRenameFileSystem();
+ }
+ }
+
+ /** Specific {@link IOException} produced by {@link FailingAtomicRenameFileSystem}. */
+ public static final class ArtificialException extends IOException {
+
+ public ArtificialException() {
+ super("Artificial exception");
+ }
+ }
+
+ private static class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper {
+
+ public FailingFSDataInputStreamWrapper(FSDataInputStream inputStream) {
+ super(inputStream);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+ && failCounter.getAndDecrement() > 0) {
+ throw new ArtificialException();
+ }
+ return super.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (ThreadLocalRandom.current().nextInt(failPossibility) == 0
+ && failCounter.getAndDecrement() > 0) {
+ throw new ArtificialException();
+ }
+ return super.read(b, off, len);
+ }
+ }
+
+ private static class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper {
+
+ public FailingFSDataOutputStreamWrapper(FSDataOutputStream outputStream) {
+ super(outputStream);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+ throw new ArtificialException();
+ }
+ super.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) {
+ throw new ArtificialException();
+ }
+ super.write(b, off, len);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
new file mode 100644
index 0000000..2ae6946
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.connector.file.table.RowDataPartitionComputer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
+import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.UUID;
+
+/** Factory which produces {@link Path}s for each type of files. */
+public class FileStorePathFactory {
+
+ private final Path root;
+ private final String uuid;
+ @Nullable private final RowDataPartitionComputer partitionComputer;
+
+ private int manifestFileCount;
+ private int manifestListCount;
+
+ public FileStorePathFactory(Path root) {
+ this(root, null, FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.defaultValue());
+ }
+
+ public FileStorePathFactory(
+ Path root, @Nullable RowType partitionType, String defaultPartValue) {
+ this.root = root;
+ this.uuid = UUID.randomUUID().toString();
+
+ if (partitionType == null) {
+ this.partitionComputer = null;
+ } else {
+ String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
+ this.partitionComputer =
+ new RowDataPartitionComputer(
+ defaultPartValue,
+ partitionColumns,
+ partitionType.getFields().stream()
+ .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType()))
+ .toArray(DataType[]::new),
+ partitionColumns);
+ }
+
+ this.manifestFileCount = 0;
+ this.manifestListCount = 0;
+ }
+
+ public Path newManifestFile() {
+ return new Path(root + "/manifest/manifest-" + uuid + "-" + (manifestFileCount++));
+ }
+
+ public Path newManifestList() {
+ return new Path(root + "/manifest/manifest-list-" + uuid + "-" + (manifestListCount++));
+ }
+
+ public Path toManifestFilePath(String manifestFileName) {
+ return new Path(root + "/manifest/" + manifestFileName);
+ }
+
+ public Path toManifestListPath(String manifestListName) {
+ return new Path(root + "/manifest/" + manifestListName);
+ }
+
+ public Path toSnapshotPath(long id) {
+ return new Path(root + "/snapshot/snapshot-" + id);
+ }
+
+ public SstPathFactory createSstPathFactory(@Nullable BinaryRowData partition, int bucket) {
+ return new SstPathFactory(root, getPartitionString(partition), bucket);
+ }
+
+ public @Nullable String getPartitionString(@Nullable BinaryRowData partition) {
+ if (partitionComputer == null) {
+ return null;
+ }
+ return PartitionPathUtils.generatePartitionPath(
+ partitionComputer.generatePartValues(
+ Preconditions.checkNotNull(
+ partition, "Partition row data is null. This is unexpected.")));
+ }
+
+ @VisibleForTesting
+ public String uuid() {
+ return uuid;
+ }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java
new file mode 100644
index 0000000..98f4101
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileStatus;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+/** A modified {@link LocalFileSystem} supporting atomic rename. */
+public class TestAtomicRenameFileSystem extends LocalFileSystem {
+
+ public static final String SCHEME = "test";
+
+ // the same file system object is cached and shared in the same JVM,
+ // so we can use java locks to ensure atomic renaming
+ private final ReentrantLock renameLock;
+
+ public TestAtomicRenameFileSystem() {
+ this.renameLock = new ReentrantLock();
+ }
+
+ @Override
+ public boolean rename(final Path src, final Path dst) throws IOException {
+ File srcFile = pathToFile(src);
+ File dstFile = pathToFile(dst);
+ File dstParent = dstFile.getParentFile();
+ dstParent.mkdirs();
+ try {
+ renameLock.lock();
+ Files.move(srcFile.toPath(), dstFile.toPath());
+ return true;
+ } catch (NoSuchFileException
+ | AccessDeniedException
+ | DirectoryNotEmptyException
+ | SecurityException
+ | FileAlreadyExistsException e) {
+ return false;
+ } finally {
+ renameLock.unlock();
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(final Path f) throws IOException {
+ // TODO remove this method once FLINK-25453 is fixed
+ File localf = pathToFile(f);
+ if (!localf.exists()) {
+ return null;
+ }
+ if (localf.isFile()) {
+ return new FileStatus[] {new LocalFileStatus(localf, this)};
+ }
+
+ final String[] names = localf.list();
+ if (names == null) {
+ return null;
+ }
+ List<FileStatus> results = new ArrayList<>();
+ for (String name : names) {
+ try {
+ results.add(getFileStatus(new Path(f, name)));
+ } catch (FileNotFoundException e) {
+ // ignore the files not found since the dir list may have have changed
+ // since the names[] list was generated.
+ }
+ }
+
+ return results.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public URI getUri() {
+ return URI.create(SCHEME + ":///");
+ }
+
+ /** {@link FileSystemFactory} for {@link TestAtomicRenameFileSystem}. */
+ public static final class TestAtomicRenameFileSystemFactory implements FileSystemFactory {
+
+ @Override
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ @Override
+ public FileSystem create(URI uri) throws IOException {
+ return new TestAtomicRenameFileSystem();
+ }
+ }
+}
diff --git a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..82e0c2d
--- /dev/null
+++ b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem$TestAtomicRenameFileSystemFactory
+org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem$FailingAtomicRenameFileSystemFactory