You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/28 02:58:37 UTC
[flink-table-store] branch master updated: [FLINK-28255] Add extraFiles to DataFileMeta
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 06439061 [FLINK-28255] Add extraFiles to DataFileMeta
06439061 is described below
commit 0643906175416c2e5132fea7d3cacc934927b645
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jun 28 10:58:33 2022 +0800
[FLINK-28255] Add extraFiles to DataFileMeta
This closes #175
---
.../flink/table/store/utils/RowDataUtils.java | 14 +++++
.../flink/table/store/file/data/DataFileMeta.java | 73 +++++++++++++++++++---
.../store/file/data/DataFileMetaSerializer.java | 8 ++-
.../table/store/file/manifest/ManifestFile.java | 23 +++++++
.../store/file/operation/FileStoreExpireImpl.java | 73 +++++++++++++---------
.../store/file/utils/FileStorePathFactory.java | 25 --------
.../table/store/file/utils/SerializationUtils.java | 10 ++-
.../flink/table/store/file/TestFileStore.java | 8 +--
.../file/data/DataFileMetaSerializerTest.java | 4 +-
.../store/file/operation/FileStoreExpireTest.java | 50 +++++++++++++++
10 files changed, 215 insertions(+), 73 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
index 9f1eb027..15a2ebb3 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
@@ -43,7 +43,9 @@ import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** Utils for {@link RowData} structures. */
@@ -256,4 +258,16 @@ public class RowDataUtils {
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
}
+
+ public static ArrayData toStringArrayData(List<String> list) {
+ return new GenericArrayData(list.stream().map(StringData::fromString).toArray());
+ }
+
+ public static List<String> fromStringArrayData(ArrayData arrayData) {
+ List<String> list = new ArrayList<>(arrayData.size());
+ for (int i = 0; i < arrayData.size(); i++) {
+ list.add(arrayData.isNullAt(i) ? null : arrayData.getString(i).toString());
+ }
+ return list;
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
index e85e3f5e..b8d70ec0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
@@ -21,17 +21,19 @@ package org.apache.flink.table.store.file.data;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.stats.BinaryTableStats;
import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
import static org.apache.flink.table.store.file.utils.SerializationUtils.newBytesType;
+import static org.apache.flink.table.store.file.utils.SerializationUtils.newStringType;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Metadata of a data file. */
@@ -59,6 +61,8 @@ public class DataFileMeta {
private final long schemaId;
private final int level;
+ private final List<String> extraFiles;
+
public static DataFileMeta forAppend(
String fileName,
long fileSize,
@@ -93,6 +97,34 @@ public class DataFileMeta {
long maxSequenceNumber,
long schemaId,
int level) {
+ this(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ Collections.emptyList());
+ }
+
+ public DataFileMeta(
+ String fileName,
+ long fileSize,
+ long rowCount,
+ BinaryRowData minKey,
+ BinaryRowData maxKey,
+ BinaryTableStats keyStats,
+ BinaryTableStats valueStats,
+ long minSequenceNumber,
+ long maxSequenceNumber,
+ long schemaId,
+ int level,
+ List<String> extraFiles) {
this.fileName = fileName;
this.fileSize = fileSize;
this.rowCount = rowCount;
@@ -106,6 +138,7 @@ public class DataFileMeta {
this.maxSequenceNumber = maxSequenceNumber;
this.level = level;
this.schemaId = schemaId;
+ this.extraFiles = Collections.unmodifiableList(extraFiles);
}
public String fileName() {
@@ -152,6 +185,10 @@ public class DataFileMeta {
return level;
}
+ public List<String> extraFiles() {
+ return extraFiles;
+ }
+
public DataFileMeta upgrade(int newLevel) {
checkArgument(newLevel > this.level);
return new DataFileMeta(
@@ -165,7 +202,24 @@ public class DataFileMeta {
minSequenceNumber,
maxSequenceNumber,
schemaId,
- newLevel);
+ newLevel,
+ extraFiles);
+ }
+
+ public DataFileMeta copy(List<String> newExtraFiles) {
+ return new DataFileMeta(
+ fileName,
+ fileSize,
+ rowCount,
+ minKey,
+ maxKey,
+ keyStats,
+ valueStats,
+ minSequenceNumber,
+ maxSequenceNumber,
+ schemaId,
+ level,
+ newExtraFiles);
}
@Override
@@ -184,7 +238,8 @@ public class DataFileMeta {
&& minSequenceNumber == that.minSequenceNumber
&& maxSequenceNumber == that.maxSequenceNumber
&& schemaId == that.schemaId
- && level == that.level;
+ && level == that.level
+ && Objects.equals(extraFiles, that.extraFiles);
}
@Override
@@ -200,13 +255,14 @@ public class DataFileMeta {
minSequenceNumber,
maxSequenceNumber,
schemaId,
- level);
+ level,
+ extraFiles);
}
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d}",
+ "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d, %s}",
fileName,
fileSize,
rowCount,
@@ -217,12 +273,13 @@ public class DataFileMeta {
minSequenceNumber,
maxSequenceNumber,
schemaId,
- level);
+ level,
+ extraFiles);
}
public static RowType schema() {
List<RowType.RowField> fields = new ArrayList<>();
- fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)));
+ fields.add(new RowType.RowField("_FILE_NAME", newStringType(false)));
fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false)));
fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
fields.add(new RowType.RowField("_MIN_KEY", newBytesType(false)));
@@ -233,6 +290,8 @@ public class DataFileMeta {
fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
fields.add(new RowType.RowField("_SCHEMA_ID", new BigIntType(false)));
fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
+ fields.add(
+ new RowType.RowField("_EXTRA_FILES", new ArrayType(false, newStringType(false))));
return new RowType(fields);
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
index 6050a8fb..193e8e8f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.store.file.utils.ObjectSerializer;
import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
+import static org.apache.flink.table.store.utils.RowDataUtils.fromStringArrayData;
+import static org.apache.flink.table.store.utils.RowDataUtils.toStringArrayData;
/** Serializer for {@link DataFileMeta}. */
public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> {
@@ -49,7 +51,8 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> {
meta.minSequenceNumber(),
meta.maxSequenceNumber(),
meta.schemaId(),
- meta.level());
+ meta.level(),
+ toStringArrayData(meta.extraFiles()));
}
@Override
@@ -65,6 +68,7 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> {
row.getLong(7),
row.getLong(8),
row.getLong(9),
- row.getInt(10));
+ row.getInt(10),
+ fromStringArrayData(row.getArray(11)));
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 8b9d43d5..f77c697b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -38,12 +38,17 @@ import org.apache.flink.table.store.file.writer.MetricFileWriter;
import org.apache.flink.table.store.file.writer.RollingFileWriter;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.function.Supplier;
/**
@@ -102,6 +107,24 @@ public class ManifestFile {
}
}
+ public Iterable<ManifestEntry> readManifestFiles(List<String> manifestFiles) {
+ Queue<String> files = new LinkedList<>(manifestFiles);
+ return Iterables.concat(
+ (Iterable<Iterable<ManifestEntry>>)
+ () ->
+ new Iterator<Iterable<ManifestEntry>>() {
+ @Override
+ public boolean hasNext() {
+ return files.size() > 0;
+ }
+
+ @Override
+ public Iterable<ManifestEntry> next() {
+ return read(files.poll());
+ }
+ });
+ }
+
/**
* Write several {@link ManifestEntry}s into manifest files.
*
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 690ba672..bd7bf226 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -18,9 +18,9 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
@@ -34,10 +34,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
/**
* Default implementation of {@link FileStoreExpire}. It retains a certain number or period of
@@ -158,8 +162,6 @@ public class FileStoreExpireImpl implements FileStoreExpire {
}
// delete data files
- FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache =
- new FileStorePathFactory.DataFilePathFactoryCache(pathFactory);
// deleted data files in a snapshot are not used by that snapshot, so the range of id should
// be (beginInclusiveId, endExclusiveId]
for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
@@ -167,34 +169,12 @@ public class FileStoreExpireImpl implements FileStoreExpire {
LOG.debug("Ready to delete data files in snapshot #" + id);
}
- Snapshot toExpire = snapshotManager.snapshot(id);
- List<ManifestFileMeta> deltaManifests = manifestList.read(toExpire.deltaManifestList());
-
- // we cannot delete a data file directly when we meet a DELETE entry, because that
- // file might be upgraded
- Set<Path> dataFileToDelete = new HashSet<>();
- for (ManifestFileMeta meta : deltaManifests) {
- for (ManifestEntry entry : manifestFile.read(meta.fileName())) {
- DataFilePathFactory dataFilePathFactory =
- dataFilePathFactoryCache.getDataFilePathFactory(
- entry.partition(), entry.bucket());
- Path dataFilePath = dataFilePathFactory.toPath(entry.file().fileName());
- switch (entry.kind()) {
- case ADD:
- dataFileToDelete.remove(dataFilePath);
- break;
- case DELETE:
- dataFileToDelete.add(dataFilePath);
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown value kind " + entry.kind().name());
- }
- }
- }
- for (Path dataFile : dataFileToDelete) {
- FileUtils.deleteOrWarn(dataFile);
- }
+ List<String> manifestFiles =
+ manifestList.read(snapshotManager.snapshot(id).deltaManifestList()).stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toList());
+ Iterable<ManifestEntry> dataFileLog = manifestFile.readManifestFiles(manifestFiles);
+ expireDataFiles(dataFileLog);
}
// delete manifests
@@ -228,6 +208,37 @@ public class FileStoreExpireImpl implements FileStoreExpire {
writeEarliestHint(endExclusiveId);
}
+ @VisibleForTesting
+ void expireDataFiles(Iterable<ManifestEntry> dataFileLog) {
+ // we cannot delete a data file directly when we meet a DELETE entry, because that
+ // file might be upgraded
+ Map<Path, List<Path>> dataFileToDelete = new HashMap<>();
+ for (ManifestEntry entry : dataFileLog) {
+ Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket());
+ Path dataFilePath = new Path(bucketPath, entry.file().fileName());
+ switch (entry.kind()) {
+ case ADD:
+ dataFileToDelete.remove(dataFilePath);
+ break;
+ case DELETE:
+ List<Path> extraFiles = new ArrayList<>(entry.file().extraFiles().size());
+ for (String file : entry.file().extraFiles()) {
+ extraFiles.add(new Path(bucketPath, file));
+ }
+ dataFileToDelete.put(dataFilePath, extraFiles);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + entry.kind().name());
+ }
+ }
+ dataFileToDelete.forEach(
+ (path, extraFiles) -> {
+ FileUtils.deleteOrWarn(path);
+ extraFiles.forEach(FileUtils::deleteOrWarn);
+ });
+ }
+
private void writeEarliestHint(long earliest) {
// update earliest hint file
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index dff608c4..b8fafc96 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.concurrent.ThreadSafe;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -131,27 +129,4 @@ public class FileStorePathFactory {
public String uuid() {
return uuid;
}
-
- /** Cache for storing {@link DataFilePathFactory}s. */
- public static class DataFilePathFactoryCache {
-
- private final FileStorePathFactory pathFactory;
- private final Map<BinaryRowData, Map<Integer, DataFilePathFactory>> cache;
-
- public DataFilePathFactoryCache(FileStorePathFactory pathFactory) {
- this.pathFactory = pathFactory;
- this.cache = new HashMap<>();
- }
-
- public DataFilePathFactory getDataFilePathFactory(BinaryRowData partition, int bucket) {
- return cache.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
- .compute(
- bucket,
- (b, f) ->
- f == null
- ? pathFactory.createDataFilePathFactory(
- partition, bucket)
- : f);
- }
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java
index b78eb5c5..e18cedd3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
import java.io.EOFException;
import java.io.IOException;
@@ -58,9 +59,14 @@ public class SerializationUtils {
return buf;
}
- /** Create a bytes type VarBinaryType(Integer.MAX_VALUE). */
+ /** Create a bytes type VarBinaryType(VarBinaryType.MAX_LENGTH). */
public static VarBinaryType newBytesType(boolean isNullable) {
- return new VarBinaryType(isNullable, Integer.MAX_VALUE);
+ return new VarBinaryType(isNullable, VarBinaryType.MAX_LENGTH);
+ }
+
+ /** Create a varchar type VarCharType(VarCharType.MAX_LENGTH). */
+ public static VarCharType newStringType(boolean isNullable) {
+ return new VarCharType(isNullable, VarCharType.MAX_LENGTH);
}
/**
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index a2f0a5df..1c4ff107 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -359,8 +359,6 @@ public class TestFileStore extends KeyValueFileStore {
FileStorePathFactory pathFactory = pathFactory();
ManifestList manifestList = manifestListFactory().create();
FileStoreScan scan = newScan();
- FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache =
- new FileStorePathFactory.DataFilePathFactoryCache(pathFactory);
SnapshotManager snapshotManager = snapshotManager();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
@@ -396,9 +394,9 @@ public class TestFileStore extends KeyValueFileStore {
List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
for (ManifestEntry entry : entries) {
result.add(
- dataFilePathFactoryCache
- .getDataFilePathFactory(entry.partition(), entry.bucket())
- .toPath(entry.file().fileName()));
+ new Path(
+ pathFactory.bucketPath(entry.partition(), entry.bucket()),
+ entry.file().fileName()));
}
}
return result;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java
index c929e4df..c7cf19a8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.data;
import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase;
+import java.util.Arrays;
+
/** Tests for {@link DataFileMetaSerializer}. */
public class DataFileMetaSerializerTest extends ObjectSerializerTestBase<DataFileMeta> {
@@ -32,6 +34,6 @@ public class DataFileMetaSerializerTest extends ObjectSerializerTestBase<DataFil
@Override
protected DataFileMeta object() {
- return gen.next().meta;
+ return gen.next().meta.copy(Arrays.asList("extra1", "extra2"));
}
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 7b9bc4b0..999be1c9 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -18,13 +18,18 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.TestFileStore;
import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.junit.jupiter.api.AfterEach;
@@ -34,10 +39,12 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FileStoreExpireImpl}. */
@@ -68,6 +75,49 @@ public class FileStoreExpireTest {
store.assertCleaned();
}
+ @Test
+ public void testExpireExtraFiles() throws IOException {
+ FileStoreExpireImpl expire = store.newExpire(1, 3, Long.MAX_VALUE);
+
+ // write test files
+ BinaryRowData partition = gen.getPartition(gen.next());
+ Path bucketPath = store.pathFactory().bucketPath(partition, 0);
+ Path myDataFile = new Path(bucketPath, "myDataFile");
+ FileUtils.writeFileUtf8(myDataFile, "1");
+ Path extra1 = new Path(bucketPath, "extra1");
+ FileUtils.writeFileUtf8(extra1, "2");
+ Path extra2 = new Path(bucketPath, "extra2");
+ FileUtils.writeFileUtf8(extra2, "3");
+
+ // create DataFileMeta and ManifestEntry
+ List<String> extraFiles = Arrays.asList("extra1", "extra2");
+ DataFileMeta dataFile =
+ new DataFileMeta(
+ "myDataFile",
+ 1,
+ 1,
+ EMPTY_ROW,
+ EMPTY_ROW,
+ null,
+ null,
+ 0,
+ 1,
+ 0,
+ 0,
+ extraFiles);
+ ManifestEntry add = new ManifestEntry(ValueKind.ADD, partition, 0, 1, dataFile);
+ ManifestEntry delete = new ManifestEntry(ValueKind.DELETE, partition, 0, 1, dataFile);
+
+ // expire
+ expire.expireDataFiles(Arrays.asList(add, delete));
+
+ // check
+ FileSystem fs = myDataFile.getFileSystem();
+ assertThat(fs.exists(myDataFile)).isFalse();
+ assertThat(fs.exists(extra1)).isFalse();
+ assertThat(fs.exists(extra2)).isFalse();
+ }
+
@Test
public void testNoSnapshot() {
FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);