You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "john8628 (via GitHub)" <gi...@apache.org> on 2023/08/07 12:44:59 UTC

[GitHub] [incubator-paimon] john8628 opened a new pull request, #1753: add flink file clean action

john8628 opened a new pull request, #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   Linked issue: close #1571 
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] zhangjun0x01 commented on pull request #1753: [core] flink file clean action

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1667953159

   I think we should not only check the data files, but also check metadata 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] flink file clean action [incubator-paimon]

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1764696651

   > sorry , i can't continue to work for this PR ,
   
   I would like to do it .  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] flink file clean action [incubator-paimon]

Posted by "john8628 (via GitHub)" <gi...@apache.org>.
john8628 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1762893507

   sorry , i can't continue to work for this PR , 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] john8628 commented on pull request #1753: [core] flink file clean action

Posted by "john8628 (via GitHub)" <gi...@apache.org>.
john8628 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1676713265

   
   > Left some minor comments
   
   thanks for your reply, good  ideaï¼›


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] john8628 commented on pull request #1753: [core] flink file clean action

Posted by "john8628 (via GitHub)" <gi...@apache.org>.
john8628 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1671282085

   > Hi, please run `mvn spotless::apply` in your dev environment to fix Checkstyle violation.
   
   OK , thanks , i will do it ;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core] flink file clean action [incubator-paimon]

Posted by "john8628 (via GitHub)" <gi...@apache.org>.
john8628 closed pull request #1753: [core] flink file clean action
URL: https://github.com/apache/incubator-paimon/pull/1753


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] wg1026688210 commented on a diff in pull request #1753: [core] flink file clean action

Posted by "wg1026688210 (via GitHub)" <gi...@apache.org>.
wg1026688210 commented on code in PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#discussion_r1292624669


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanAction.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/** File Clean table orphan file action for Flink. */
+public class FileCleanAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileCleanAction.class);
+    private static final int MAX_DRIVER_LISTING_DEPTH = 3;
+    private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
+    private static final long olderThanTimestamp =
+            System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
+
+    private Path basePath;
+    private FileIO fileIO;
+
+    public FileCleanAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports drop-partition action. The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+        AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
+        basePath = fileStoreTable.location();
+        fileIO = fileStoreTable.fileIO();
+    }
+
+    @Override
+    public void run() throws Exception {
+        LOG.info(
+                "Scan all files and filter out that still used by at least one snapshot, then delete those not used by any snapshot..");
+        // 1: get all files  of the table path
+        List<String> allFileList = buildAllFileList(basePath.getPath());
+        // 2: get all the used files of the table;exclude the manifest , schema  and snapshot files;
+        List<String> validFileNameList = buildValidFileNameList();
+        // 3: get the diff of  the files
+        allFileList.remove(validFileNameList);
+        LOG.info("orphan files:{}", allFileList.size());
+        // 4:delete the file
+        allFileList.stream()

Review Comment:
   How about adding a thread num opt. 



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanAction.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/** File Clean table orphan file action for Flink. */
+public class FileCleanAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileCleanAction.class);
+    private static final int MAX_DRIVER_LISTING_DEPTH = 3;
+    private static final int MAX_DRIVER_LISTING_DIRECT_SUB_DIRS = 10;
+    private static final long olderThanTimestamp =
+            System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
+
+    private Path basePath;
+    private FileIO fileIO;
+
+    public FileCleanAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports drop-partition action. The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+        AbstractFileStoreTable fileStoreTable = (AbstractFileStoreTable) table;
+        basePath = fileStoreTable.location();
+        fileIO = fileStoreTable.fileIO();
+    }
+
+    @Override
+    public void run() throws Exception {
+        LOG.info(
+                "Scan all files and filter out that still used by at least one snapshot, then delete those not used by any snapshot..");
+        // 1: get all files  of the table path
+        List<String> allFileList = buildAllFileList(basePath.getPath());
+        // 2: get all the used files of the table;exclude the manifest , schema  and snapshot files;
+        List<String> validFileNameList = buildValidFileNameList();
+        // 3: get the diff of  the files
+        allFileList.remove(validFileNameList);
+        LOG.info("orphan files:{}", allFileList.size());
+        // 4:delete the file
+        allFileList.stream()
+                .flatMap(
+                        s -> {
+                            try {
+                                fileIO.delete(new Path(s), true);
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                            }
+                            return null;
+                        });
+    }
+
+    /**
+     * query the valid files of the table.
+     *
+     * @return
+     */
+    private List<String> buildValidFileNameList() {
+        // allfiles
+        org.apache.flink.table.api.Table allDataFilesTableResult =
+                batchTEnv.sqlQuery(
+                        String.format(
+                                "SELECT * FROM %s ", identifier.getEscapedFullName() + "$files"));
+        // query the manifest
+        org.apache.flink.table.api.Table manifestFilesTableResult =
+                batchTEnv.sqlQuery(
+                        String.format(
+                                "SELECT * FROM %s ", identifier.getEscapedFullName() + "$files"));

Review Comment:
   $manifests?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanActionFactory.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link FileCleanAction}. */
+public class FileCleanActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "file-clean";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterTool params) {
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+
+        Map<String, String> catalogConfig = optionalConfigMap(params, "catalog-conf");
+
+        FileCleanAction action =
+                new FileCleanAction(tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig);
+
+        return Optional.of(action);
+    }
+
+    public void printHelp() {
+        System.out.println("Action \"file-clean\" clean data files from a table that not used.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  file-clean --warehouse <warehouse-path> --database <database-name> "
+                        + "--table <table-name> ");
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  file-clean --path hdfs:///path/to/warehouse/test_db.db/test_table --where id > (SELECT count(*) FROM employee)");

Review Comment:
   what 's the usage of `where` opt



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/FileCleanAction.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+/** File Clean table orphan file action for Flink. */
+public class FileCleanAction extends TableActionBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileCleanAction.class);
+    private static final int MAX_DRIVER_LISTING_DEPTH = 3;

Review Comment:
   Do we need to infer based on the number of partition fields and bucket directories?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] john8628 commented on pull request #1753: [core] flink file clean action

Posted by "john8628 (via GitHub)" <gi...@apache.org>.
john8628 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1667972985

   > hi ,@john8628 , could you add the test case and document ?
   Thank you for your review. I will incorporate these shortly, once I am certain about the legality of my implementation.
   
   > I think we should not only check the data files, but also check metadata
   yean metadata should not deleted;  I will implement checks to ensure they are not deleted and remain intact.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] zhangjun0x01 commented on pull request #1753: [core] flink file clean action

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1667996856

   In addition, it is also very important to add an    `olderThan` ,  because the process of writing data is to first write data files. and then write snapshot. When we have written the data but have not yet submitted the snapshot, if the clean action is running at this time, it will delete the newly written data. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] yuzelin commented on pull request #1753: [core] flink file clean action

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1670585730

   Hi, please run `mvn spotless::apply` in your dev environment to fix Checkstyle violation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] zhangjun0x01 commented on pull request #1753: [core] flink file clean action

Posted by "zhangjun0x01 (via GitHub)" <gi...@apache.org>.
zhangjun0x01 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1667949486

   hi ,@john8628 , could you add the test case and document ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] john8628 commented on pull request #1753: [core] flink file clean action

Posted by "john8628 (via GitHub)" <gi...@apache.org>.
john8628 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1675658602

   @yuzelin @zhangjun0x01 @JingsongLi PTAL 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-paimon] john8628 commented on pull request #1753: [core] flink file clean action

Posted by "john8628 (via GitHub)" <gi...@apache.org>.
john8628 commented on PR #1753:
URL: https://github.com/apache/incubator-paimon/pull/1753#issuecomment-1668012317

   > In addition, it is also very important to add an `olderThan` , because the process of writing data is to first write data files. and then write snapshot. When we have written the data but have not yet submitted the snapshot, if the clean action is running at this time, it will delete the newly written data.
   
   Thanks for your raising this important consideration;  You're absolutely right, including an "olderThan" parameter is essential in order to prevent the accidental deletion of newly written data when the clean action is executed during the period between writing data files and submitting snapshot; 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org