You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "yuzelin (via GitHub)" <gi...@apache.org> on 2023/11/21 04:27:54 UTC

[PR] [flink] Support remove-orphan-files action [incubator-paimon]

yuzelin opened a new pull request, #2357:
URL: https://github.com/apache/incubator-paimon/pull/2357

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   Linked issue: close #xxx
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] Support remove-orphan-files action [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2357:
URL: https://github.com/apache/incubator-paimon/pull/2357#discussion_r1400168469


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {

Review Comment:
   move this to paimon-core



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
+    private static final List<String> PAIMON_FILE_PREFIX =
+            Arrays.asList(
+                    DataFilePathFactory.DATA_FILE_PREFIX,
+                    DataFilePathFactory.CHANGELOG_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_LIST_PREFIX,
+                    FileStorePathFactory.INDEX_MANIFEST_PREFIX,
+                    FileStorePathFactory.INDEX_FILE_PREFIX);
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    private int deletedFileNums;
+
+    RemoveOrphanFilesAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+    }
+
+    public RemoveOrphanFilesAction olderThan(String timestamp) {
+        // The FileStatus#getModificationTime returns milliseconds
+        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3).getMillisecond();
+        return this;
+    }
+
+    @VisibleForTesting
+    public int getDeletedFileNums() {
+        return deletedFileNums;
+    }
+
+    @Override
+    public void run() throws Exception {
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
+                table.getClass().getName());
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        if (snapshotManager.earliestSnapshotId() == null) {
+            LOG.info("No snapshot found, skip removing.");
+            return;
+        }
+
+        buildUsedFilesView(fileStoreTable);
+        buildAllFilesView(fileStoreTable);
+
+        // find not used files
+        String query =
+                "SELECT path FROM `ALL` WHERE `ALL`.fileName NOT IN (SELECT fileName FROM `USED`)";
+        CloseableIterator<Row> iterator = batchTEnv.executeSql(query).collect();
+
+        while (iterator.hasNext()) {
+            String fileName = iterator.next().getFieldAs(0);
+            fileStoreTable.fileIO().deleteQuietly(new Path(fileName));
+            deletedFileNums++;
+        }
+
+        iterator.close();
+    }
+
+    /**
+     * List all the used files by snapshots and tags, and store them in view USED(fileName STRING).
+     */
+    private void buildUsedFilesView(FileStoreTable table) throws IOException {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        SortedMap<Snapshot, String> tags = tagManager.tags();
+        List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+        if (!tags.isEmpty()) {
+            earliestSnapshotId = Math.min(earliestSnapshotId, taggedSnapshots.get(0).id());
+            latestSnapshotId =
+                    Math.max(latestSnapshotId, taggedSnapshots.get(tags.size() - 1).id());
+        }
+
+        FileStore<?> store = table.store();
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+        Set<String> usedFiles = new HashSet<>();
+        for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {
+            Snapshot currentSnapshot = findSnapshot(snapshotManager, tagManager, tags, id);
+
+            if (currentSnapshot == null) {
+                continue;
+            }
+
+            List<String> usedByCurrent = new ArrayList<>();
+            addManifestList(usedByCurrent, currentSnapshot);
+
+            // try to read manifests
+            ReadingResult<ManifestFileMeta> manifestFileMatas =
+                    retryReadingFiles(() -> currentSnapshot.allManifests(manifestList));
+            if (manifestFileMatas.hasException()) {
+                checkSnapshotAndTag(
+                        currentSnapshot,
+                        snapshotManager,
+                        tagManager,
+                        tags,
+                        manifestFileMatas.originalException());
+                continue;
+            }
+            List<String> manifestFileName =
+                    manifestFileMatas.result().stream()
+                            .map(ManifestFileMeta::fileName)
+                            .collect(Collectors.toList());
+            usedByCurrent.addAll(manifestFileName);
+
+            // try to read data files
+            ReadingResult<String> dataFiles = retryReadingDataFiles(manifestFileName, manifestFile);
+            if (dataFiles.hasException()) {
+                checkSnapshotAndTag(
+                        currentSnapshot,
+                        snapshotManager,
+                        tagManager,
+                        tags,
+                        dataFiles.originalException());
+                continue;
+            }
+            usedByCurrent.addAll(dataFiles.result());
+
+            // try to read index files
+            String indexManifest = currentSnapshot.indexManifest();
+            if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
+                usedByCurrent.add(indexManifest);
+
+                ReadingResult<IndexManifestEntry> indexManifestEntries =
+                        retryReadingFiles(() -> indexFileHandler.readManifest(indexManifest));
+                if (indexManifestEntries.hasException()) {
+                    checkSnapshotAndTag(
+                            currentSnapshot,
+                            snapshotManager,
+                            tagManager,
+                            tags,
+                            indexManifestEntries.originalException());
+                    continue;
+                }
+
+                indexManifestEntries.result().stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .map(IndexFileMeta::fileName)
+                        .forEach(usedByCurrent::add);
+            }
+
+            usedFiles.addAll(usedByCurrent);
+        }
+
+        RowTypeInfo rowTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO},
+                        new String[] {"fileName"});
+        DataStream<Row> usedFileDataStream =
+                env.fromCollection(usedFiles)
+                        .flatMap(
+                                (FlatMapFunction<String, Row>)
+                                        (value, out) -> out.collect(Row.of(value)))
+                        .returns(rowTypeInfo);
+        batchTEnv.createTemporaryView("USED", usedFileDataStream);
+    }
+
+    @Nullable
+    private Snapshot findSnapshot(
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            SortedMap<Snapshot, String> tags,
+            long targetId) {
+        if (snapshotManager.snapshotExists(targetId)) {
+            return snapshotManager.snapshot(targetId);
+        } else {
+            Snapshot snapshot = TagManager.find(tags, targetId);
+            if (snapshot != null && tagManager.tagExists(tags.get(snapshot))) {
+                return snapshot;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private void addManifestList(List<String> used, Snapshot snapshot) {
+        used.add(snapshot.baseManifestList());
+        used.add(snapshot.deltaManifestList());
+        String changelogManifestList = snapshot.changelogManifestList();
+        if (changelogManifestList != null) {
+            used.add(changelogManifestList);
+        }
+    }
+
+    private <T> ReadingResult<T> retryReadingFiles(Supplier<List<T>> reader) {
+        int retryNumber = 0;
+        Exception caught = null;
+        while (retryNumber++ < READ_FILE_RETRY_NUM) {
+            try {
+                List<T> result = reader.get();
+                return ReadingResult.forResult(result);
+            } catch (Exception origin) {
+                caught = origin;
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }
+        return ReadingResult.forException(caught);
+    }
+
+    private ReadingResult<String> retryReadingDataFiles(
+            List<String> manifestNames, ManifestFile manifestFile) {
+        List<String> dataFiles = new ArrayList<>();
+        for (String manifestName : manifestNames) {
+            ReadingResult<ManifestEntry> readingResult =
+                    retryReadingFiles(() -> manifestFile.read(manifestName));
+            if (readingResult.hasException()) {
+                return ReadingResult.forException(readingResult.originalException());
+            }
+            readingResult.result().stream()
+                    .map(ManifestEntry::file)
+                    .forEach(
+                            f -> {
+                                dataFiles.add(f.fileName());
+                                dataFiles.addAll(f.extraFiles());
+                            });
+        }
+        return ReadingResult.forResult(dataFiles);
+    }
+
+    /**
+     * If the snapshot/tag which are being read still exists, throw exception to stop removing
+     * process.
+     */
+    private void checkSnapshotAndTag(
+            Snapshot currentSnapshot,
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            SortedMap<Snapshot, String> tags,
+            @Nullable Exception optionalException)
+            throws IOException {
+        long id = currentSnapshot.id();
+        if (snapshotManager.snapshotExists(id)) {
+            throw new IOException(
+                    String.format(
+                            "Failed to read snapshot %s when trying to find used files. "
+                                    + "To avoid deleting used files, stop removing process.",
+                            id),
+                    optionalException);
+        } else {
+            String tagName = tags.get(currentSnapshot);
+            if (tagName != null && tagManager.tagExists(tagName)) {
+                throw new IOException(
+                        String.format(
+                                "Failed to read tag %s when trying to find used files. "
+                                        + "To avoid deleting used files, stop removing process.",
+                                tagName),
+                        optionalException);
+            }
+        }
+    }
+
+    /**
+     * List all the Paimon files, filter them by olderThanMillis by snapshots and tags, and store
+     * them in view ALL(path STRING, fileName STRING).
+     */
+    private void buildAllFilesView(FileStoreTable table) throws Exception {
+        RowTypeInfo rowTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {
+                            BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                        },
+                        new String[] {"path", "fileName"});
+
+        DataStream<Row> allFilesDataStream =
+                env.fromCollection(
+                                listPaimonFileDirs(
+                                        table.fileIO(),
+                                        table.location(),
+                                        table.partitionKeys().size()))
+                        .flatMap(new ListCandidates(table.fileIO(), olderThanMillis))
+                        .returns(rowTypeInfo);
+        batchTEnv.createTemporaryView("ALL", allFilesDataStream);

Review Comment:
   Can we just use `COMMON_IO_FORK_JOIN_POOL`? We don't need to use Flink Datastream



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
+    private static final List<String> PAIMON_FILE_PREFIX =
+            Arrays.asList(
+                    DataFilePathFactory.DATA_FILE_PREFIX,
+                    DataFilePathFactory.CHANGELOG_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_LIST_PREFIX,
+                    FileStorePathFactory.INDEX_MANIFEST_PREFIX,
+                    FileStorePathFactory.INDEX_FILE_PREFIX);
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    private int deletedFileNums;
+
+    RemoveOrphanFilesAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+    }
+
+    public RemoveOrphanFilesAction olderThan(String timestamp) {
+        // The FileStatus#getModificationTime returns milliseconds
+        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3).getMillisecond();
+        return this;
+    }
+
+    @VisibleForTesting
+    public int getDeletedFileNums() {
+        return deletedFileNums;
+    }
+
+    @Override
+    public void run() throws Exception {
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
+                table.getClass().getName());
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        if (snapshotManager.earliestSnapshotId() == null) {
+            LOG.info("No snapshot found, skip removing.");
+            return;
+        }
+
+        buildUsedFilesView(fileStoreTable);
+        buildAllFilesView(fileStoreTable);
+
+        // find not used files
+        String query =
+                "SELECT path FROM `ALL` WHERE `ALL`.fileName NOT IN (SELECT fileName FROM `USED`)";
+        CloseableIterator<Row> iterator = batchTEnv.executeSql(query).collect();
+
+        while (iterator.hasNext()) {
+            String fileName = iterator.next().getFieldAs(0);
+            fileStoreTable.fileIO().deleteQuietly(new Path(fileName));
+            deletedFileNums++;
+        }
+
+        iterator.close();
+    }
+
+    /**
+     * List all the used files by snapshots and tags, and store them in view USED(fileName STRING).
+     */
+    private void buildUsedFilesView(FileStoreTable table) throws IOException {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        SortedMap<Snapshot, String> tags = tagManager.tags();
+        List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+        if (!tags.isEmpty()) {
+            earliestSnapshotId = Math.min(earliestSnapshotId, taggedSnapshots.get(0).id());
+            latestSnapshotId =
+                    Math.max(latestSnapshotId, taggedSnapshots.get(tags.size() - 1).id());
+        }
+
+        FileStore<?> store = table.store();
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+        Set<String> usedFiles = new HashSet<>();
+        for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {

Review Comment:
   use `COMMON_IO_FORK_JOIN_POOL` to accelate.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
+    private static final List<String> PAIMON_FILE_PREFIX =
+            Arrays.asList(
+                    DataFilePathFactory.DATA_FILE_PREFIX,
+                    DataFilePathFactory.CHANGELOG_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_LIST_PREFIX,
+                    FileStorePathFactory.INDEX_MANIFEST_PREFIX,
+                    FileStorePathFactory.INDEX_FILE_PREFIX);
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    private int deletedFileNums;
+
+    RemoveOrphanFilesAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+    }
+
+    public RemoveOrphanFilesAction olderThan(String timestamp) {
+        // The FileStatus#getModificationTime returns milliseconds
+        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3).getMillisecond();
+        return this;
+    }
+
+    @VisibleForTesting
+    public int getDeletedFileNums() {
+        return deletedFileNums;
+    }
+
+    @Override
+    public void run() throws Exception {
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
+                table.getClass().getName());
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        if (snapshotManager.earliestSnapshotId() == null) {
+            LOG.info("No snapshot found, skip removing.");
+            return;
+        }
+
+        buildUsedFilesView(fileStoreTable);
+        buildAllFilesView(fileStoreTable);
+
+        // find not used files
+        String query =
+                "SELECT path FROM `ALL` WHERE `ALL`.fileName NOT IN (SELECT fileName FROM `USED`)";
+        CloseableIterator<Row> iterator = batchTEnv.executeSql(query).collect();
+
+        while (iterator.hasNext()) {
+            String fileName = iterator.next().getFieldAs(0);
+            fileStoreTable.fileIO().deleteQuietly(new Path(fileName));
+            deletedFileNums++;
+        }
+
+        iterator.close();
+    }
+
+    /**
+     * List all the used files by snapshots and tags, and store them in view USED(fileName STRING).
+     */
+    private void buildUsedFilesView(FileStoreTable table) throws IOException {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        SortedMap<Snapshot, String> tags = tagManager.tags();
+        List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+        if (!tags.isEmpty()) {
+            earliestSnapshotId = Math.min(earliestSnapshotId, taggedSnapshots.get(0).id());
+            latestSnapshotId =
+                    Math.max(latestSnapshotId, taggedSnapshots.get(tags.size() - 1).id());
+        }
+
+        FileStore<?> store = table.store();
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+        Set<String> usedFiles = new HashSet<>();
+        for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {

Review Comment:
   just construct a Set<Snapshot> to contain snapshots and tags.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
+    private static final List<String> PAIMON_FILE_PREFIX =
+            Arrays.asList(
+                    DataFilePathFactory.DATA_FILE_PREFIX,
+                    DataFilePathFactory.CHANGELOG_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_LIST_PREFIX,
+                    FileStorePathFactory.INDEX_MANIFEST_PREFIX,
+                    FileStorePathFactory.INDEX_FILE_PREFIX);
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    private int deletedFileNums;
+
+    RemoveOrphanFilesAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+    }
+
+    public RemoveOrphanFilesAction olderThan(String timestamp) {
+        // The FileStatus#getModificationTime returns milliseconds
+        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3).getMillisecond();
+        return this;
+    }
+
+    @VisibleForTesting
+    public int getDeletedFileNums() {
+        return deletedFileNums;
+    }
+
+    @Override
+    public void run() throws Exception {
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
+                table.getClass().getName());
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        if (snapshotManager.earliestSnapshotId() == null) {
+            LOG.info("No snapshot found, skip removing.");
+            return;
+        }
+
+        buildUsedFilesView(fileStoreTable);
+        buildAllFilesView(fileStoreTable);
+
+        // find not used files
+        String query =
+                "SELECT path FROM `ALL` WHERE `ALL`.fileName NOT IN (SELECT fileName FROM `USED`)";
+        CloseableIterator<Row> iterator = batchTEnv.executeSql(query).collect();
+
+        while (iterator.hasNext()) {
+            String fileName = iterator.next().getFieldAs(0);
+            fileStoreTable.fileIO().deleteQuietly(new Path(fileName));
+            deletedFileNums++;
+        }
+
+        iterator.close();
+    }
+
+    /**
+     * List all the used files by snapshots and tags, and store them in view USED(fileName STRING).
+     */
+    private void buildUsedFilesView(FileStoreTable table) throws IOException {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        SortedMap<Snapshot, String> tags = tagManager.tags();
+        List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+        if (!tags.isEmpty()) {
+            earliestSnapshotId = Math.min(earliestSnapshotId, taggedSnapshots.get(0).id());
+            latestSnapshotId =
+                    Math.max(latestSnapshotId, taggedSnapshots.get(tags.size() - 1).id());
+        }
+
+        FileStore<?> store = table.store();
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+        Set<String> usedFiles = new HashSet<>();
+        for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {
+            Snapshot currentSnapshot = findSnapshot(snapshotManager, tagManager, tags, id);
+
+            if (currentSnapshot == null) {
+                continue;
+            }
+
+            List<String> usedByCurrent = new ArrayList<>();
+            addManifestList(usedByCurrent, currentSnapshot);
+
+            // try to read manifests
+            ReadingResult<ManifestFileMeta> manifestFileMatas =
+                    retryReadingFiles(() -> currentSnapshot.allManifests(manifestList));
+            if (manifestFileMatas.hasException()) {
+                checkSnapshotAndTag(
+                        currentSnapshot,
+                        snapshotManager,
+                        tagManager,
+                        tags,
+                        manifestFileMatas.originalException());
+                continue;
+            }
+            List<String> manifestFileName =
+                    manifestFileMatas.result().stream()
+                            .map(ManifestFileMeta::fileName)
+                            .collect(Collectors.toList());
+            usedByCurrent.addAll(manifestFileName);
+
+            // try to read data files
+            ReadingResult<String> dataFiles = retryReadingDataFiles(manifestFileName, manifestFile);
+            if (dataFiles.hasException()) {
+                checkSnapshotAndTag(
+                        currentSnapshot,
+                        snapshotManager,
+                        tagManager,
+                        tags,
+                        dataFiles.originalException());
+                continue;
+            }
+            usedByCurrent.addAll(dataFiles.result());
+
+            // try to read index files
+            String indexManifest = currentSnapshot.indexManifest();
+            if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
+                usedByCurrent.add(indexManifest);
+
+                ReadingResult<IndexManifestEntry> indexManifestEntries =
+                        retryReadingFiles(() -> indexFileHandler.readManifest(indexManifest));
+                if (indexManifestEntries.hasException()) {
+                    checkSnapshotAndTag(
+                            currentSnapshot,
+                            snapshotManager,
+                            tagManager,
+                            tags,
+                            indexManifestEntries.originalException());
+                    continue;
+                }
+
+                indexManifestEntries.result().stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .map(IndexFileMeta::fileName)
+                        .forEach(usedByCurrent::add);
+            }
+
+            usedFiles.addAll(usedByCurrent);
+        }
+
+        RowTypeInfo rowTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO},
+                        new String[] {"fileName"});
+        DataStream<Row> usedFileDataStream =
+                env.fromCollection(usedFiles)
+                        .flatMap(
+                                (FlatMapFunction<String, Row>)
+                                        (value, out) -> out.collect(Row.of(value)))
+                        .returns(rowTypeInfo);
+        batchTEnv.createTemporaryView("USED", usedFileDataStream);
+    }
+
+    @Nullable
+    private Snapshot findSnapshot(
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            SortedMap<Snapshot, String> tags,
+            long targetId) {
+        if (snapshotManager.snapshotExists(targetId)) {
+            return snapshotManager.snapshot(targetId);
+        } else {
+            Snapshot snapshot = TagManager.find(tags, targetId);
+            if (snapshot != null && tagManager.tagExists(tags.get(snapshot))) {
+                return snapshot;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private void addManifestList(List<String> used, Snapshot snapshot) {
+        used.add(snapshot.baseManifestList());
+        used.add(snapshot.deltaManifestList());
+        String changelogManifestList = snapshot.changelogManifestList();
+        if (changelogManifestList != null) {
+            used.add(changelogManifestList);
+        }
+    }
+
+    private <T> ReadingResult<T> retryReadingFiles(Supplier<List<T>> reader) {
+        int retryNumber = 0;
+        Exception caught = null;
+        while (retryNumber++ < READ_FILE_RETRY_NUM) {
+            try {
+                List<T> result = reader.get();
+                return ReadingResult.forResult(result);
+            } catch (Exception origin) {
+                caught = origin;
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }
+        return ReadingResult.forException(caught);

Review Comment:
   just skip FileNot found exception



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
+    private static final List<String> PAIMON_FILE_PREFIX =
+            Arrays.asList(
+                    DataFilePathFactory.DATA_FILE_PREFIX,
+                    DataFilePathFactory.CHANGELOG_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_LIST_PREFIX,
+                    FileStorePathFactory.INDEX_MANIFEST_PREFIX,
+                    FileStorePathFactory.INDEX_FILE_PREFIX);
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    private int deletedFileNums;
+
+    RemoveOrphanFilesAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+    }
+
+    public RemoveOrphanFilesAction olderThan(String timestamp) {
+        // The FileStatus#getModificationTime returns milliseconds
+        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3).getMillisecond();
+        return this;
+    }
+
+    @VisibleForTesting
+    public int getDeletedFileNums() {
+        return deletedFileNums;
+    }
+
+    @Override
+    public void run() throws Exception {
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
+                table.getClass().getName());
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        if (snapshotManager.earliestSnapshotId() == null) {
+            LOG.info("No snapshot found, skip removing.");
+            return;
+        }
+
+        buildUsedFilesView(fileStoreTable);
+        buildAllFilesView(fileStoreTable);
+
+        // find not used files
+        String query =
+                "SELECT path FROM `ALL` WHERE `ALL`.fileName NOT IN (SELECT fileName FROM `USED`)";
+        CloseableIterator<Row> iterator = batchTEnv.executeSql(query).collect();
+
+        while (iterator.hasNext()) {
+            String fileName = iterator.next().getFieldAs(0);
+            fileStoreTable.fileIO().deleteQuietly(new Path(fileName));
+            deletedFileNums++;
+        }
+
+        iterator.close();

Review Comment:
   Delete snapshot temporary files too.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
+    private static final List<String> PAIMON_FILE_PREFIX =
+            Arrays.asList(
+                    DataFilePathFactory.DATA_FILE_PREFIX,
+                    DataFilePathFactory.CHANGELOG_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_LIST_PREFIX,
+                    FileStorePathFactory.INDEX_MANIFEST_PREFIX,
+                    FileStorePathFactory.INDEX_FILE_PREFIX);
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    private int deletedFileNums;
+
+    RemoveOrphanFilesAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+    }
+
+    public RemoveOrphanFilesAction olderThan(String timestamp) {
+        // The FileStatus#getModificationTime returns milliseconds
+        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3).getMillisecond();
+        return this;
+    }
+
+    @VisibleForTesting
+    public int getDeletedFileNums() {
+        return deletedFileNums;
+    }
+
+    @Override
+    public void run() throws Exception {
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
+                table.getClass().getName());
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        if (snapshotManager.earliestSnapshotId() == null) {
+            LOG.info("No snapshot found, skip removing.");
+            return;
+        }
+
+        buildUsedFilesView(fileStoreTable);
+        buildAllFilesView(fileStoreTable);
+
+        // find not used files
+        String query =
+                "SELECT path FROM `ALL` WHERE `ALL`.fileName NOT IN (SELECT fileName FROM `USED`)";
+        CloseableIterator<Row> iterator = batchTEnv.executeSql(query).collect();
+
+        while (iterator.hasNext()) {
+            String fileName = iterator.next().getFieldAs(0);
+            fileStoreTable.fileIO().deleteQuietly(new Path(fileName));
+            deletedFileNums++;
+        }
+
+        iterator.close();
+    }
+
+    /**
+     * List all the used files by snapshots and tags, and store them in view USED(fileName STRING).
+     */
+    private void buildUsedFilesView(FileStoreTable table) throws IOException {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        SortedMap<Snapshot, String> tags = tagManager.tags();
+        List<Snapshot> taggedSnapshots = new ArrayList<>(tags.keySet());
+        if (!tags.isEmpty()) {
+            earliestSnapshotId = Math.min(earliestSnapshotId, taggedSnapshots.get(0).id());
+            latestSnapshotId =
+                    Math.max(latestSnapshotId, taggedSnapshots.get(tags.size() - 1).id());
+        }
+
+        FileStore<?> store = table.store();
+        ManifestList manifestList = store.manifestListFactory().create();
+        ManifestFile manifestFile = store.manifestFileFactory().create();
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+        Set<String> usedFiles = new HashSet<>();
+        for (long id = latestSnapshotId; id >= earliestSnapshotId; id--) {
+            Snapshot currentSnapshot = findSnapshot(snapshotManager, tagManager, tags, id);
+
+            if (currentSnapshot == null) {
+                continue;
+            }
+
+            List<String> usedByCurrent = new ArrayList<>();
+            addManifestList(usedByCurrent, currentSnapshot);
+
+            // try to read manifests
+            ReadingResult<ManifestFileMeta> manifestFileMatas =
+                    retryReadingFiles(() -> currentSnapshot.allManifests(manifestList));
+            if (manifestFileMatas.hasException()) {
+                checkSnapshotAndTag(
+                        currentSnapshot,
+                        snapshotManager,
+                        tagManager,
+                        tags,
+                        manifestFileMatas.originalException());
+                continue;
+            }
+            List<String> manifestFileName =
+                    manifestFileMatas.result().stream()
+                            .map(ManifestFileMeta::fileName)
+                            .collect(Collectors.toList());
+            usedByCurrent.addAll(manifestFileName);
+
+            // try to read data files
+            ReadingResult<String> dataFiles = retryReadingDataFiles(manifestFileName, manifestFile);
+            if (dataFiles.hasException()) {
+                checkSnapshotAndTag(
+                        currentSnapshot,
+                        snapshotManager,
+                        tagManager,
+                        tags,
+                        dataFiles.originalException());
+                continue;
+            }
+            usedByCurrent.addAll(dataFiles.result());
+
+            // try to read index files
+            String indexManifest = currentSnapshot.indexManifest();
+            if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
+                usedByCurrent.add(indexManifest);
+
+                ReadingResult<IndexManifestEntry> indexManifestEntries =
+                        retryReadingFiles(() -> indexFileHandler.readManifest(indexManifest));
+                if (indexManifestEntries.hasException()) {
+                    checkSnapshotAndTag(
+                            currentSnapshot,
+                            snapshotManager,
+                            tagManager,
+                            tags,
+                            indexManifestEntries.originalException());
+                    continue;
+                }
+
+                indexManifestEntries.result().stream()
+                        .map(IndexManifestEntry::indexFile)
+                        .map(IndexFileMeta::fileName)
+                        .forEach(usedByCurrent::add);
+            }
+
+            usedFiles.addAll(usedByCurrent);
+        }
+
+        RowTypeInfo rowTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO},
+                        new String[] {"fileName"});
+        DataStream<Row> usedFileDataStream =
+                env.fromCollection(usedFiles)
+                        .flatMap(
+                                (FlatMapFunction<String, Row>)
+                                        (value, out) -> out.collect(Row.of(value)))
+                        .returns(rowTypeInfo);
+        batchTEnv.createTemporaryView("USED", usedFileDataStream);
+    }
+
+    @Nullable
+    private Snapshot findSnapshot(
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            SortedMap<Snapshot, String> tags,
+            long targetId) {
+        if (snapshotManager.snapshotExists(targetId)) {
+            return snapshotManager.snapshot(targetId);
+        } else {
+            Snapshot snapshot = TagManager.find(tags, targetId);
+            if (snapshot != null && tagManager.tagExists(tags.get(snapshot))) {
+                return snapshot;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private void addManifestList(List<String> used, Snapshot snapshot) {
+        used.add(snapshot.baseManifestList());
+        used.add(snapshot.deltaManifestList());
+        String changelogManifestList = snapshot.changelogManifestList();
+        if (changelogManifestList != null) {
+            used.add(changelogManifestList);
+        }
+    }
+
+    private <T> ReadingResult<T> retryReadingFiles(Supplier<List<T>> reader) {
+        int retryNumber = 0;
+        Exception caught = null;
+        while (retryNumber++ < READ_FILE_RETRY_NUM) {
+            try {
+                List<T> result = reader.get();
+                return ReadingResult.forResult(result);
+            } catch (Exception origin) {
+                caught = origin;
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }
+        return ReadingResult.forException(caught);
+    }
+
+    private ReadingResult<String> retryReadingDataFiles(
+            List<String> manifestNames, ManifestFile manifestFile) {
+        List<String> dataFiles = new ArrayList<>();
+        for (String manifestName : manifestNames) {
+            ReadingResult<ManifestEntry> readingResult =
+                    retryReadingFiles(() -> manifestFile.read(manifestName));
+            if (readingResult.hasException()) {
+                return ReadingResult.forException(readingResult.originalException());
+            }
+            readingResult.result().stream()
+                    .map(ManifestEntry::file)
+                    .forEach(
+                            f -> {
+                                dataFiles.add(f.fileName());
+                                dataFiles.addAll(f.extraFiles());
+                            });
+        }
+        return ReadingResult.forResult(dataFiles);
+    }
+
+    /**
+     * If the snapshot/tag which are being read still exists, throw exception to stop removing
+     * process.
+     */
+    private void checkSnapshotAndTag(
+            Snapshot currentSnapshot,
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            SortedMap<Snapshot, String> tags,
+            @Nullable Exception optionalException)
+            throws IOException {
+        long id = currentSnapshot.id();
+        if (snapshotManager.snapshotExists(id)) {
+            throw new IOException(
+                    String.format(
+                            "Failed to read snapshot %s when trying to find used files. "
+                                    + "To avoid deleting used files, stop removing process.",
+                            id),
+                    optionalException);
+        } else {
+            String tagName = tags.get(currentSnapshot);
+            if (tagName != null && tagManager.tagExists(tagName)) {
+                throw new IOException(
+                        String.format(
+                                "Failed to read tag %s when trying to find used files. "
+                                        + "To avoid deleting used files, stop removing process.",
+                                tagName),
+                        optionalException);
+            }
+        }
+    }
+
+    /**
+     * List all the Paimon files, filter them by olderThanMillis by snapshots and tags, and store
+     * them in view ALL(path STRING, fileName STRING).
+     */
+    private void buildAllFilesView(FileStoreTable table) throws Exception {
+        RowTypeInfo rowTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {
+                            BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                        },
+                        new String[] {"path", "fileName"});
+
+        DataStream<Row> allFilesDataStream =
+                env.fromCollection(
+                                listPaimonFileDirs(
+                                        table.fileIO(),
+                                        table.location(),
+                                        table.partitionKeys().size()))
+                        .flatMap(new ListCandidates(table.fileIO(), olderThanMillis))
+                        .returns(rowTypeInfo);
+        batchTEnv.createTemporaryView("ALL", allFilesDataStream);
+    }
+
+    /** List directories that contains data files and manifest files. */
+    private List<Path> listPaimonFileDirs(FileIO fileIO, Path location, int partitionKeyNums)
+            throws Exception {
+        List<Path> paimonFileDirs = new ArrayList<>();
+
+        paimonFileDirs.add(new Path(location, "manifest"));
+        paimonFileDirs.add(new Path(location, "index"));
+        paimonFileDirs.addAll(listDataDirs(fileIO, location, partitionKeyNums));
+
+        return paimonFileDirs;
+    }
+
+    /**
+     * List directories that contains data files. The argument level is used to control recursive
+     * depth.
+     */
+    private List<Path> listDataDirs(FileIO fileIO, Path dir, int level) throws Exception {
+        if (level == 0) {
+            // list bucket paths
+            List<FileStatus> status = retryListingDirs(fileIO, dir);
+
+            return status.stream()
+                    .filter(FileStatus::isDir)
+                    .map(FileStatus::getPath)
+                    .filter(p -> p.getName().startsWith(DataFilePathFactory.BUCKET_PATH_PREFIX))
+                    .collect(Collectors.toList());
+        }
+
+        List<FileStatus> status = retryListingDirs(fileIO, dir);
+
+        List<Path> partitionPaths =
+                status.stream()
+                        .filter(FileStatus::isDir)
+                        .map(FileStatus::getPath)
+                        .filter(p -> p.getName().contains("="))
+                        .collect(Collectors.toList());
+
+        // dive into the next partition level
+        List<Path> results = new ArrayList<>();
+        for (Path path : partitionPaths) {
+            results.addAll(listDataDirs(fileIO, path, level - 1));
+        }
+
+        return results;
+    }
+
+    private List<FileStatus> retryListingDirs(FileIO fileIO, Path dir) throws Exception {
+        ReadingResult<FileStatus> statuses =
+                retryReadingFiles(
+                        () -> {
+                            FileStatus[] s;
+                            try {
+                                s = fileIO.listStatus(dir);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                            if (s == null) {
+                                throw new RuntimeException(
+                                        "Failed to listStatus for data directories of path " + dir);
+                            }
+                            return Arrays.asList(s);
+                        });
+
+        if (statuses.hasException()) {
+            throw statuses.originalException();
+        }
+
+        return statuses.result();
+    }
+
+    private static class ReadingResult<T> {
+
+        @Nullable private final List<T> result;
+        @Nullable private final Exception originalException;
+        private final boolean hasException;
+
+        ReadingResult(@Nullable List<T> result, @Nullable Exception originalException) {
+            checkArgument(!(result == null && originalException == null));
+            this.result = result;
+            this.originalException = originalException;
+            this.hasException = originalException != null;
+        }
+
+        List<T> result() {
+            return result;
+        }
+
+        Exception originalException() {
+            return originalException;
+        }
+
+        boolean hasException() {
+            return hasException;
+        }
+
+        static <T> ReadingResult<T> forResult(List<T> result) {
+            return new ReadingResult<>(result, null);
+        }
+
+        static <T> ReadingResult<T> forException(Exception e) {
+            return new ReadingResult<>(null, e);
+        }
+    }
+
+    /** List candidates files that may be deleted. */
+    private static class ListCandidates implements FlatMapFunction<Path, Row> {
+
+        private final FileIO fileIO;
+        private final long olderThanMillis;
+
+        ListCandidates(FileIO fileIO, long olderThanMillis) {
+            this.fileIO = fileIO;
+            this.olderThanMillis = olderThanMillis;
+        }
+
+        @Override
+        public void flatMap(Path dir, Collector<Row> out) throws Exception {
+            if (!fileIO.exists(dir)) {
+                return;
+            }
+
+            FileStatus[] status = fileIO.listStatus(dir);
+            checkNotNull(
+                    status,
+                    "The return value is null of the listStatus for the '%s' directory.",
+                    dir);
+
+            Arrays.stream(status)
+                    .filter(s -> !s.isDir())
+                    .filter(this::oldEnough)
+                    .map(FileStatus::getPath)
+                    .filter(this::isPaimonFile)
+                    .map(path -> Row.of(path.getPath(), path.getName()))
+                    .forEach(out::collect);
+        }
+
+        private boolean oldEnough(FileStatus status) {
+            return status.getModificationTime() < olderThanMillis;
+        }
+
+        private boolean isPaimonFile(Path path) {

Review Comment:
   delete this method, we can just delete all useless files.



##########
paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java:
##########
@@ -236,4 +238,14 @@ private static int findPreviousOrEqualTag(
         }
         return -1;
     }
+
+    @Nullable
+    public static Snapshot find(SortedMap<Snapshot, String> tags, long targetSnapshotId) {

Review Comment:
   remove this method.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java:
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * To remove the orphan data files and metadata files.
+ *
+ * <p>To avoid deleting newly written files, it only deletes orphan files older than 1 day by
+ * default. The interval can be modified by {@link #olderThan}.
+ *
+ * <p>To avoid conflicting with snapshot expiration, tag deletion and rollback, it will skip the
+ * snapshot/tag that are not existed when listing used files.
+ *
+ * <p>To avoid deleting files that are used but not read by mistaken, it will stop removing process
+ * when failed to read used files.
+ *
+ * <p>Some ideas refer to Iceberg's {@code DeleteOrphanFilesSparkAction}.
+ */
+public class RemoveOrphanFilesAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+
+    private static final List<String> PAIMON_FILE_PREFIX =
+            Arrays.asList(
+                    DataFilePathFactory.DATA_FILE_PREFIX,
+                    DataFilePathFactory.CHANGELOG_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_FILE_PREFIX,
+                    FileStorePathFactory.MANIFEST_LIST_PREFIX,
+                    FileStorePathFactory.INDEX_MANIFEST_PREFIX,
+                    FileStorePathFactory.INDEX_FILE_PREFIX);
+
+    private static final int READ_FILE_RETRY_NUM = 3;
+    private static final int READ_FILE_RETRY_INTERVAL = 5;
+
+    private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
+    private int deletedFileNums;
+
+    RemoveOrphanFilesAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+    }
+
+    public RemoveOrphanFilesAction olderThan(String timestamp) {
+        // The FileStatus#getModificationTime returns milliseconds
+        this.olderThanMillis = DateTimeUtils.parseTimestampData(timestamp, 3).getMillisecond();
+        return this;
+    }
+
+    @VisibleForTesting
+    public int getDeletedFileNums() {
+        return deletedFileNums;
+    }
+
+    @Override
+    public void run() throws Exception {
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.",
+                table.getClass().getName());
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+        if (snapshotManager.earliestSnapshotId() == null) {
+            LOG.info("No snapshot found, skip removing.");
+            return;
+        }
+
+        buildUsedFilesView(fileStoreTable);
+        buildAllFilesView(fileStoreTable);
+
+        // find not used files
+        String query =
+                "SELECT path FROM `ALL` WHERE `ALL`.fileName NOT IN (SELECT fileName FROM `USED`)";
+        CloseableIterator<Row> iterator = batchTEnv.executeSql(query).collect();
+
+        while (iterator.hasNext()) {
+            String fileName = iterator.next().getFieldAs(0);
+            fileStoreTable.fileIO().deleteQuietly(new Path(fileName));
+            deletedFileNums++;
+        }
+
+        iterator.close();
+    }
+
+    /**
+     * List all the used files by snapshots and tags, and store them in view USED(fileName STRING).
+     */
+    private void buildUsedFilesView(FileStoreTable table) throws IOException {
+        SnapshotManager snapshotManager = table.snapshotManager();
+        TagManager tagManager = table.tagManager();
+
+        long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+        long latestSnapshotId = snapshotManager.latestSnapshotId();
+
+        SortedMap<Snapshot, String> tags = tagManager.tags();

Review Comment:
   tags should ignore file not found exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][flink] Introduce orphan files clean and corresponding flink action [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #2357:
URL: https://github.com/apache/incubator-paimon/pull/2357


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] Support remove-orphan-files action [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on PR #2357:
URL: https://github.com/apache/incubator-paimon/pull/2357#issuecomment-1820324261

   Some codes have referenced #2223 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org