You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "wg1026688210 (via GitHub)" <gi...@apache.org> on 2023/08/13 03:15:45 UTC

[GitHub] [incubator-paimon] wg1026688210 commented on a diff in pull request #1753: [core] flink file clean action

wg1026688210 commented on code in PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#discussion_r1292624669


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanAction.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/** File Clean table orphan file action for Flink. */
+public class FileCleanAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileCleanAction.class);
+    private static final int MAX_DRIVER_LISTING_DEPTH = 3;
+    private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
+    private static final long olderThanTimestamp =
+            System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
+
+    private Path basePath;
+    private FileIO fileIO;
+
+    public FileCleanAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports drop-partition action. The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+        AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
+        basePath = fileStoreTable.location();
+        fileIO = fileStoreTable.fileIO();
+    }
+
+    @Override
+    public void run() throws Exception {
+        LOG.info(
+                "Scan all files and filter out that still used by at least one snapshot, then delete those not used by any snapshot..");
+        // 1: get all files  of the table path
+        List<String> allFileList = buildAllFileList(basePath.getPath());
+        // 2: get all the used files of the table;exclude the manifest , schema  and snapshot files;
+        List<String> validFileNameList = buildValidFileNameList();
+        // 3: get the diff of  the files
+        allFileList.remove(validFileNameList);
+        LOG.info("orphan files:{}", allFileList.size());
+        // 4:delete the file
+        allFileList.stream()

Review Comment:
   How about adding a thread num opt. 



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanAction.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/** File Clean table orphan file action for Flink. */
+public class FileCleanAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileCleanAction.class);
+    private static final int MAX_DRIVER_LISTING_DEPTH = 3;
+    private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
+    private static final long olderThanTimestamp =
+            System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
+
+    private Path basePath;
+    private FileIO fileIO;
+
+    public FileCleanAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports drop-partition action. The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+        AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
+        basePath = fileStoreTable.location();
+        fileIO = fileStoreTable.fileIO();
+    }
+
+    @Override
+    public void run() throws Exception {
+        LOG.info(
+                "Scan all files and filter out that still used by at least one snapshot, then delete those not used by any snapshot..");
+        // 1: get all files  of the table path
+        List<String> allFileList = buildAllFileList(basePath.getPath());
+        // 2: get all the used files of the table;exclude the manifest , schema  and snapshot files;
+        List<String> validFileNameList = buildValidFileNameList();
+        // 3: get the diff of  the files
+        allFileList.remove(validFileNameList);
+        LOG.info("orphan files:{}", allFileList.size());
+        // 4:delete the file
+        allFileList.stream()
+                .flatMap(
+                        s -> {
+                            try {
+                                fileIO.delete(new Path(s), true);
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                            }
+                            return null;
+                        });
+    }
+
+    /**
+     * query the valid files of the table.
+     *
+     * @return
+     */
+    private List<String> buildValidFileNameList() {
+        // allfiles
+        org.apache.flink.table.api.Table allDataFilesTableResult =
+                batchTEnv.sqlQuery(
+                        String.format(
+                                "SELECT * FROM %s ", identifier.getEscapedFullName() + "$files"));
+        // query the manifest
+        org.apache.flink.table.api.Table manifestFilesTableResult =
+                batchTEnv.sqlQuery(
+                        String.format(
+                                "SELECT * FROM %s ", identifier.getEscapedFullName() + "$files"));

Review Comment:
   $manifests?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanActionFactory.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link FileCleanAction}. */
+public class FileCleanActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "file-clean";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterTool params) {
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+
+        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+
+        FileCleanAction action =
+                new FileCleanAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);
+
+        return Optional.of(action);
+    }
+
+    public void printHelp() {
+        System.out.println("Action \"file-clean\" clean data files from a table that not used.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  file-clean --warehouse <warehouse-path> --database <database-name> "
+                        + "--table <table-name> ");
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  file-clean --path hdfs:///path/to/warehouse/test_db.db/test_table --where id > (SELECT count(*) FROM employee)");

Review Comment:
   what 's the usage of `where` opt



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanAction.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/** File Clean table orphan file action for Flink. */
+public class FileCleanAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileCleanAction.class);
+    private static final int MAX_DRIVER_LISTING_DEPTH = 3;

Review Comment:
   Do we need to infer based on the number of partition fields and bucket directories?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org