You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/28 02:58:37 UTC

[flink-table-store] branch master updated: [FLINK-28255] Add extraFiles to DataFileMeta

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 06439061 [FLINK-28255] Add extraFiles to DataFileMeta
06439061 is described below

commit 0643906175416c2e5132fea7d3cacc934927b645
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jun 28 10:58:33 2022 +0800

    [FLINK-28255] Add extraFiles to DataFileMeta
    
    This closes #175
---
 .../flink/table/store/utils/RowDataUtils.java      | 14 +++++
 .../flink/table/store/file/data/DataFileMeta.java  | 73 +++++++++++++++++++---
 .../store/file/data/DataFileMetaSerializer.java    |  8 ++-
 .../table/store/file/manifest/ManifestFile.java    | 23 +++++++
 .../store/file/operation/FileStoreExpireImpl.java  | 73 +++++++++++++---------
 .../store/file/utils/FileStorePathFactory.java     | 25 --------
 .../table/store/file/utils/SerializationUtils.java | 10 ++-
 .../flink/table/store/file/TestFileStore.java      |  8 +--
 .../file/data/DataFileMetaSerializerTest.java      |  4 +-
 .../store/file/operation/FileStoreExpireTest.java  | 50 +++++++++++++++
 10 files changed, 215 insertions(+), 73 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
index 9f1eb027..15a2ebb3 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
@@ -43,7 +43,9 @@ import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Utils for {@link RowData} structures. */
@@ -256,4 +258,16 @@ public class RowDataUtils {
                 throw new UnsupportedOperationException("Unsupported type: " + fieldType);
         }
     }
+
+    public static ArrayData toStringArrayData(List<String> list) {
+        return new GenericArrayData(list.stream().map(StringData::fromString).toArray());
+    }
+
+    public static List<String> fromStringArrayData(ArrayData arrayData) {
+        List<String> list = new ArrayList<>(arrayData.size());
+        for (int i = 0; i < arrayData.size(); i++) {
+            list.add(arrayData.isNullAt(i) ? null : arrayData.getString(i).toString());
+        }
+        return list;
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
index e85e3f5e..b8d70ec0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
@@ -21,17 +21,19 @@ package org.apache.flink.table.store.file.data;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.stats.BinaryTableStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
 import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
 import static org.apache.flink.table.store.file.utils.SerializationUtils.newBytesType;
+import static org.apache.flink.table.store.file.utils.SerializationUtils.newStringType;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Metadata of a data file. */
@@ -59,6 +61,8 @@ public class DataFileMeta {
     private final long schemaId;
     private final int level;
 
+    private final List<String> extraFiles;
+
     public static DataFileMeta forAppend(
             String fileName,
             long fileSize,
@@ -93,6 +97,34 @@ public class DataFileMeta {
             long maxSequenceNumber,
             long schemaId,
             int level) {
+        this(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                keyStats,
+                valueStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                level,
+                Collections.emptyList());
+    }
+
+    public DataFileMeta(
+            String fileName,
+            long fileSize,
+            long rowCount,
+            BinaryRowData minKey,
+            BinaryRowData maxKey,
+            BinaryTableStats keyStats,
+            BinaryTableStats valueStats,
+            long minSequenceNumber,
+            long maxSequenceNumber,
+            long schemaId,
+            int level,
+            List<String> extraFiles) {
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.rowCount = rowCount;
@@ -106,6 +138,7 @@ public class DataFileMeta {
         this.maxSequenceNumber = maxSequenceNumber;
         this.level = level;
         this.schemaId = schemaId;
+        this.extraFiles = Collections.unmodifiableList(extraFiles);
     }
 
     public String fileName() {
@@ -152,6 +185,10 @@ public class DataFileMeta {
         return level;
     }
 
+    public List<String> extraFiles() {
+        return extraFiles;
+    }
+
     public DataFileMeta upgrade(int newLevel) {
         checkArgument(newLevel > this.level);
         return new DataFileMeta(
@@ -165,7 +202,24 @@ public class DataFileMeta {
                 minSequenceNumber,
                 maxSequenceNumber,
                 schemaId,
-                newLevel);
+                newLevel,
+                extraFiles);
+    }
+
+    public DataFileMeta copy(List<String> newExtraFiles) {
+        return new DataFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                minKey,
+                maxKey,
+                keyStats,
+                valueStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                schemaId,
+                level,
+                newExtraFiles);
     }
 
     @Override
@@ -184,7 +238,8 @@ public class DataFileMeta {
                 && minSequenceNumber == that.minSequenceNumber
                 && maxSequenceNumber == that.maxSequenceNumber
                 && schemaId == that.schemaId
-                && level == that.level;
+                && level == that.level
+                && Objects.equals(extraFiles, that.extraFiles);
     }
 
     @Override
@@ -200,13 +255,14 @@ public class DataFileMeta {
                 minSequenceNumber,
                 maxSequenceNumber,
                 schemaId,
-                level);
+                level,
+                extraFiles);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d}",
+                "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d, %s}",
                 fileName,
                 fileSize,
                 rowCount,
@@ -217,12 +273,13 @@ public class DataFileMeta {
                 minSequenceNumber,
                 maxSequenceNumber,
                 schemaId,
-                level);
+                level,
+                extraFiles);
     }
 
     public static RowType schema() {
         List<RowType.RowField> fields = new ArrayList<>();
-        fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE)));
+        fields.add(new RowType.RowField("_FILE_NAME", newStringType(false)));
         fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false)));
         fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false)));
         fields.add(new RowType.RowField("_MIN_KEY", newBytesType(false)));
@@ -233,6 +290,8 @@ public class DataFileMeta {
         fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
         fields.add(new RowType.RowField("_SCHEMA_ID", new BigIntType(false)));
         fields.add(new RowType.RowField("_LEVEL", new IntType(false)));
+        fields.add(
+                new RowType.RowField("_EXTRA_FILES", new ArrayType(false, newStringType(false))));
         return new RowType(fields);
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
index 6050a8fb..193e8e8f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.store.file.utils.ObjectSerializer;
 
 import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow;
 import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow;
+import static org.apache.flink.table.store.utils.RowDataUtils.fromStringArrayData;
+import static org.apache.flink.table.store.utils.RowDataUtils.toStringArrayData;
 
 /** Serializer for {@link DataFileMeta}. */
 public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> {
@@ -49,7 +51,8 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> {
                 meta.minSequenceNumber(),
                 meta.maxSequenceNumber(),
                 meta.schemaId(),
-                meta.level());
+                meta.level(),
+                toStringArrayData(meta.extraFiles()));
     }
 
     @Override
@@ -65,6 +68,7 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> {
                 row.getLong(7),
                 row.getLong(8),
                 row.getLong(9),
-                row.getInt(10));
+                row.getInt(10),
+                fromStringArrayData(row.getArray(11)));
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 8b9d43d5..f77c697b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -38,12 +38,17 @@ import org.apache.flink.table.store.file.writer.MetricFileWriter;
 import org.apache.flink.table.store.file.writer.RollingFileWriter;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.function.Supplier;
 
 /**
@@ -102,6 +107,24 @@ public class ManifestFile {
         }
     }
 
+    public Iterable<ManifestEntry> readManifestFiles(List<String> manifestFiles) {
+        Queue<String> files = new LinkedList<>(manifestFiles);
+        return Iterables.concat(
+                (Iterable<Iterable<ManifestEntry>>)
+                        () ->
+                                new Iterator<Iterable<ManifestEntry>>() {
+                                    @Override
+                                    public boolean hasNext() {
+                                        return files.size() > 0;
+                                    }
+
+                                    @Override
+                                    public Iterable<ManifestEntry> next() {
+                                        return read(files.poll());
+                                    }
+                                });
+    }
+
     /**
      * Write several {@link ManifestEntry}s into manifest files.
      *
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
index 690ba672..bd7bf226 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
@@ -34,10 +34,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 /**
  * Default implementation of {@link FileStoreExpire}. It retains a certain number or period of
@@ -158,8 +162,6 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         }
 
         // delete data files
-        FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache =
-                new FileStorePathFactory.DataFilePathFactoryCache(pathFactory);
         // deleted data files in a snapshot are not used by that snapshot, so the range of id should
         // be (beginInclusiveId, endExclusiveId]
         for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) {
@@ -167,34 +169,12 @@ public class FileStoreExpireImpl implements FileStoreExpire {
                 LOG.debug("Ready to delete data files in snapshot #" + id);
             }
 
-            Snapshot toExpire = snapshotManager.snapshot(id);
-            List<ManifestFileMeta> deltaManifests = manifestList.read(toExpire.deltaManifestList());
-
-            // we cannot delete a data file directly when we meet a DELETE entry, because that
-            // file might be upgraded
-            Set<Path> dataFileToDelete = new HashSet<>();
-            for (ManifestFileMeta meta : deltaManifests) {
-                for (ManifestEntry entry : manifestFile.read(meta.fileName())) {
-                    DataFilePathFactory dataFilePathFactory =
-                            dataFilePathFactoryCache.getDataFilePathFactory(
-                                    entry.partition(), entry.bucket());
-                    Path dataFilePath = dataFilePathFactory.toPath(entry.file().fileName());
-                    switch (entry.kind()) {
-                        case ADD:
-                            dataFileToDelete.remove(dataFilePath);
-                            break;
-                        case DELETE:
-                            dataFileToDelete.add(dataFilePath);
-                            break;
-                        default:
-                            throw new UnsupportedOperationException(
-                                    "Unknown value kind " + entry.kind().name());
-                    }
-                }
-            }
-            for (Path dataFile : dataFileToDelete) {
-                FileUtils.deleteOrWarn(dataFile);
-            }
+            List<String> manifestFiles =
+                    manifestList.read(snapshotManager.snapshot(id).deltaManifestList()).stream()
+                            .map(ManifestFileMeta::fileName)
+                            .collect(Collectors.toList());
+            Iterable<ManifestEntry> dataFileLog = manifestFile.readManifestFiles(manifestFiles);
+            expireDataFiles(dataFileLog);
         }
 
         // delete manifests
@@ -228,6 +208,37 @@ public class FileStoreExpireImpl implements FileStoreExpire {
         writeEarliestHint(endExclusiveId);
     }
 
+    @VisibleForTesting
+    void expireDataFiles(Iterable<ManifestEntry> dataFileLog) {
+        // we cannot delete a data file directly when we meet a DELETE entry, because that
+        // file might be upgraded
+        Map<Path, List<Path>> dataFileToDelete = new HashMap<>();
+        for (ManifestEntry entry : dataFileLog) {
+            Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket());
+            Path dataFilePath = new Path(bucketPath, entry.file().fileName());
+            switch (entry.kind()) {
+                case ADD:
+                    dataFileToDelete.remove(dataFilePath);
+                    break;
+                case DELETE:
+                    List<Path> extraFiles = new ArrayList<>(entry.file().extraFiles().size());
+                    for (String file : entry.file().extraFiles()) {
+                        extraFiles.add(new Path(bucketPath, file));
+                    }
+                    dataFileToDelete.put(dataFilePath, extraFiles);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
+            }
+        }
+        dataFileToDelete.forEach(
+                (path, extraFiles) -> {
+                    FileUtils.deleteOrWarn(path);
+                    extraFiles.forEach(FileUtils::deleteOrWarn);
+                });
+    }
+
     private void writeEarliestHint(long earliest) {
         // update earliest hint file
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index dff608c4..b8fafc96 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.concurrent.ThreadSafe;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -131,27 +129,4 @@ public class FileStorePathFactory {
     public String uuid() {
         return uuid;
     }
-
-    /** Cache for storing {@link DataFilePathFactory}s. */
-    public static class DataFilePathFactoryCache {
-
-        private final FileStorePathFactory pathFactory;
-        private final Map<BinaryRowData, Map<Integer, DataFilePathFactory>> cache;
-
-        public DataFilePathFactoryCache(FileStorePathFactory pathFactory) {
-            this.pathFactory = pathFactory;
-            this.cache = new HashMap<>();
-        }
-
-        public DataFilePathFactory getDataFilePathFactory(BinaryRowData partition, int bucket) {
-            return cache.compute(partition, (p, m) -> m == null ? new HashMap<>() : m)
-                    .compute(
-                            bucket,
-                            (b, f) ->
-                                    f == null
-                                            ? pathFactory.createDataFilePathFactory(
-                                                    partition, bucket)
-                                            : f);
-        }
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java
index b78eb5c5..e18cedd3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -58,9 +59,14 @@ public class SerializationUtils {
         return buf;
     }
 
-    /** Create a bytes type VarBinaryType(Integer.MAX_VALUE). */
+    /** Create a bytes type VarBinaryType(VarBinaryType.MAX_LENGTH). */
     public static VarBinaryType newBytesType(boolean isNullable) {
-        return new VarBinaryType(isNullable, Integer.MAX_VALUE);
+        return new VarBinaryType(isNullable, VarBinaryType.MAX_LENGTH);
+    }
+
+    /** Create a varchar type VarCharType(VarCharType.MAX_LENGTH). */
+    public static VarCharType newStringType(boolean isNullable) {
+        return new VarCharType(isNullable, VarCharType.MAX_LENGTH);
     }
 
     /**
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index a2f0a5df..1c4ff107 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -359,8 +359,6 @@ public class TestFileStore extends KeyValueFileStore {
         FileStorePathFactory pathFactory = pathFactory();
         ManifestList manifestList = manifestListFactory().create();
         FileStoreScan scan = newScan();
-        FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache =
-                new FileStorePathFactory.DataFilePathFactoryCache(pathFactory);
 
         SnapshotManager snapshotManager = snapshotManager();
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
@@ -396,9 +394,9 @@ public class TestFileStore extends KeyValueFileStore {
             List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files();
             for (ManifestEntry entry : entries) {
                 result.add(
-                        dataFilePathFactoryCache
-                                .getDataFilePathFactory(entry.partition(), entry.bucket())
-                                .toPath(entry.file().fileName()));
+                        new Path(
+                                pathFactory.bucketPath(entry.partition(), entry.bucket()),
+                                entry.file().fileName()));
             }
         }
         return result;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java
index c929e4df..c7cf19a8 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase;
 
+import java.util.Arrays;
+
 /** Tests for {@link DataFileMetaSerializer}. */
 public class DataFileMetaSerializerTest extends ObjectSerializerTestBase<DataFileMeta> {
 
@@ -32,6 +34,6 @@ public class DataFileMetaSerializerTest extends ObjectSerializerTestBase<DataFil
 
     @Override
     protected DataFileMeta object() {
-        return gen.next().meta;
+        return gen.next().meta.copy(Arrays.asList("extra1", "extra2"));
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
index 7b9bc4b0..999be1c9 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java
@@ -18,13 +18,18 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 
 import org.junit.jupiter.api.AfterEach;
@@ -34,10 +39,12 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FileStoreExpireImpl}. */
@@ -68,6 +75,49 @@ public class FileStoreExpireTest {
         store.assertCleaned();
     }
 
+    @Test
+    public void testExpireExtraFiles() throws IOException {
+        FileStoreExpireImpl expire = store.newExpire(1, 3, Long.MAX_VALUE);
+
+        // write test files
+        BinaryRowData partition = gen.getPartition(gen.next());
+        Path bucketPath = store.pathFactory().bucketPath(partition, 0);
+        Path myDataFile = new Path(bucketPath, "myDataFile");
+        FileUtils.writeFileUtf8(myDataFile, "1");
+        Path extra1 = new Path(bucketPath, "extra1");
+        FileUtils.writeFileUtf8(extra1, "2");
+        Path extra2 = new Path(bucketPath, "extra2");
+        FileUtils.writeFileUtf8(extra2, "3");
+
+        // create DataFileMeta and ManifestEntry
+        List<String> extraFiles = Arrays.asList("extra1", "extra2");
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "myDataFile",
+                        1,
+                        1,
+                        EMPTY_ROW,
+                        EMPTY_ROW,
+                        null,
+                        null,
+                        0,
+                        1,
+                        0,
+                        0,
+                        extraFiles);
+        ManifestEntry add = new ManifestEntry(ValueKind.ADD, partition, 0, 1, dataFile);
+        ManifestEntry delete = new ManifestEntry(ValueKind.DELETE, partition, 0, 1, dataFile);
+
+        // expire
+        expire.expireDataFiles(Arrays.asList(add, delete));
+
+        // check
+        FileSystem fs = myDataFile.getFileSystem();
+        assertThat(fs.exists(myDataFile)).isFalse();
+        assertThat(fs.exists(extra1)).isFalse();
+        assertThat(fs.exists(extra2)).isFalse();
+    }
+
     @Test
     public void testNoSnapshot() {
         FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);