You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/28 09:47:32 UTC

[GitHub] [hudi] hotienvu opened a new pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

hotienvu opened a new pull request #2210:
URL: https://github.com/apache/hudi/pull/2210


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   Since DeltaStreamer makes heavily use of file listing, if the source contains a lot of tiny files, this could  quickly become a bottle neck.
   This PR provides an option to clean up the source after each batch.
   
   
   ## Brief change log
   
   * `hoodie.deltastreamer.source.dfs.clean.mode` option is added. Available options are: delete, archive, off. Default value: off
   * A `postCommit()` API is added to Source interface that gets called after each commit
   
   ## Verify this pull request
   
   This change added tests and can be verified as follows:
   
     - Added extra tests to TestHoodieDeltaStreamer
     - Added extra tests to AbstractDFSSourceTestBase
     - Manually verified the change by running a job locally.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-784223991


   Update: I couldn't reproduce locally. I need some more time to look into it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166",
       "triggerID" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67dabdd51934a7141a299114de2b836b1f016fd5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166",
       "triggerID" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67dabdd51934a7141a299114de2b836b1f016fd5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-770377916


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc) (a77d8a5) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `20.43%`.
   > The diff coverage is `81.65%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2210       +/-   ##
   =============================================
   + Coverage     49.78%   70.21%   +20.43%     
   + Complexity     3089      379     -2710     
   =============================================
     Files           430       54      -376     
     Lines         19566     2038    -17528     
     Branches       2004      240     -1764     
   =============================================
   - Hits           9741     1431     -8310     
   + Misses         9033      470     -8563     
   + Partials        792      137      -655     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `70.21% <81.65%> (+8.35%)` | `0.00 <21.00> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...g/apache/hudi/utilities/sources/AvroDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0RGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...i/utilities/sources/helpers/FileSourceCleaner.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9GaWxlU291cmNlQ2xlYW5lci5qYXZh) | `83.58% <83.58%> (ø)` | `9.00 <9.00> (?)` | |
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.96% <100.00%> (+0.46%)` | `51.00 <0.00> (+1.00)` | |
   | [...i/utilities/deltastreamer/SourceFormatAdapter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvU291cmNlRm9ybWF0QWRhcHRlci5qYXZh) | `87.17% <100.00%> (+0.69%)` | `12.00 <1.00> (+1.00)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `100.00% <100.00%> (ø)` | `13.00 <3.00> (+3.00)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `7.00 <3.00> (+3.00)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `8.00 <3.00> (+3.00)` | |
   | [...java/org/apache/hudi/utilities/sources/Source.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU291cmNlLmphdmE=) | `88.23% <100.00%> (+0.73%)` | `6.00 <1.00> (+1.00)` | |
   | [...udi/utilities/sources/helpers/DFSPathSelector.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9ERlNQYXRoU2VsZWN0b3IuamF2YQ==) | `85.10% <100.00%> (+0.32%)` | `16.00 <1.00> (+1.00)` | |
   | [...apache/hudi/cli/commands/HoodieLogFileCommand.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL2NvbW1hbmRzL0hvb2RpZUxvZ0ZpbGVDb21tYW5kLmphdmE=) | | | |
   | ... and [383 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567408569



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {

Review comment:
       It is only for file sources and it is the only source type that need clean up AFAIK. Other sources such as Kafka or Database pull may not need clean up but other type of post-commit ops. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b845e34d11e4e44e2b41e2089349baddc3a10b80 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b845e34d11e4e44e2b41e2089349baddc3a10b80 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r570157010



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));

Review comment:
       thanks for clarifying. actually we can leave it as is. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668) 
   * 67dabdd51934a7141a299114de2b836b1f016fd5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-781095795


   @hotienvu : If you are busy, I can look at addressing any pending feedback and looking into the build failure. Will push updates to the patch w/ any fixes if you are ok.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567458193



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {

Review comment:
       added additional check for directory path under FileSourceArchiver's cleanTask




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567457504



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));

Review comment:
       Right, actually the result will never be archive_folder/./ since `new Path(".")` returns `Path("")`. I can replace "." with "" if it causes confusion. Initially I thought "." would be more obvious that we return the current parent directory without any sub-dir. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-773320271


   @nsivabalan  there is some timeout integration test, I tried to dig deeper but haven't succeed so far. I'm a bit busy this week so will try to look again this weekend. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-782864578


   > @hotienvu : If you are busy, I can look at addressing any pending feedback and looking into the build failure. Will push updates to the patch w/ any fixes if you are ok.
   
   Hi @nsivabalan: Sorry for late reply, yes your help is much appreciated since I'm not quite familiar with the integration test suite


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r579798666



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1159,6 +1163,72 @@ public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Ex
     testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @Test
+  public void testDFSSourceDeleteFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    testDFSSourceCleanUp(props);
+  }
+
+  @Test
+  public void testDFSSourceArchiveFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "archive");

Review comment:
       Can you please add a test for no-op as well. just to ensure with this patch, it is in fact no-op :) 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -217,7 +217,7 @@ public static void initClass() throws Exception {
     invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile", dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
 
-    prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
+    prepareParquetDFSFiles(PARQUET_SOURCE_ROOT + "/1.parquet", PARQUET_NUM_RECORDS);

Review comment:
       you can make the sourceRoot along as an argument. 1.parquet can be left as abstracted within the method.

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
##########
@@ -189,4 +205,63 @@ public void testReadingFromSource() throws IOException {
         Option.empty(), Long.MAX_VALUE);
     assertEquals(10101, fetch6.getBatch().get().count());
   }
+
+  @Test
+  public void testCleanUpSourceAfterCommit() throws IOException {

Review comment:
       Not sure why we have tests in this class. These classes in testutils are utility classes to assist in testing and should not have any individual tests as such. Can you move tests from this class to a separate test class. If we don't hold on this, slowly it will invite everyone to add more tests to this class. 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
##########
@@ -79,14 +85,24 @@ public void setup() throws Exception {
   @AfterEach
   public void teardown() throws Exception {
     super.teardown();
+    dfs.delete(new Path(dfsRoot), true);
+    dfs.delete(new Path(dfsArchivePath), true);
   }
 
   /**
    * Prepares the specific {@link Source} to test, by passing in necessary configurations.
    *
    * @return A {@link Source} using DFS as the file system.
+   * @param defaults
    */
-  protected abstract Source prepareDFSSource();
+  protected abstract Source prepareDFSSource(TypedProperties defaults);
+
+  /**
+   * Prepares the specific {@link Source} to test.
+   */
+  protected Source prepareDFSSource() {
+    return prepareDFSSource(new TypedProperties());

Review comment:
       if we follow the proposed changes in tests, we don't need to make any changes to these tests.

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1159,6 +1163,72 @@ public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Ex
     testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @Test
+  public void testDFSSourceDeleteFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    testDFSSourceCleanUp(props);
+  }
+
+  @Test
+  public void testDFSSourceArchiveFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "archive");
+    final String archivePath = dfsBasePath + "/archive";
+    dfs.mkdirs(new Path(archivePath));
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.archiveDir", archivePath);
+    assertEquals(0, Helpers.listAllFiles(archivePath).size());
+    testDFSSourceCleanUp(props);
+    // archive dir should contain source files
+    assertEquals(1, Helpers.listAllFiles(archivePath).size());
+  }
+
+  private void testDFSSourceCleanUp(TypedProperties props) throws Exception {
+    // since each source cleanup test will modify source, we need to set different dfs.root each test
+    final String dfsRoot = dfsBasePath + "/test_dfs_cleanup_source" + testNum;
+    dfs.mkdirs(new Path(dfsRoot));
+    prepareParquetDFSFiles(dfsRoot + "/1.parquet", PARQUET_NUM_RECORDS);
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    prepareParquetDFSSource(false, false, props);

Review comment:
       can we just move this config within prepareParquetDFSSource() itself. so that we don't need to make changes to prepareDFSSource signature. 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
##########
@@ -189,4 +205,63 @@ public void testReadingFromSource() throws IOException {
         Option.empty(), Long.MAX_VALUE);
     assertEquals(10101, fetch6.getBatch().get().count());
   }
+
+  @Test
+  public void testCleanUpSourceAfterCommit() throws IOException {
+    dfs.mkdirs(new Path(dfsRoot));
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    // use synchronous clean to be able to verify immediately
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.numThreads", "0");
+    SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource(props));
+    assertEquals(Option.empty(),
+        sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());

Review comment:
       you can move the lines from 211 to 218 to a separate private method and reuse in both tests. So that main test looks lean and easy to read. 
   Also, add no-op tests as well. 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1159,6 +1163,72 @@ public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Ex
     testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @Test
+  public void testDFSSourceDeleteFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    testDFSSourceCleanUp(props);
+  }
+
+  @Test
+  public void testDFSSourceArchiveFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "archive");
+    final String archivePath = dfsBasePath + "/archive";
+    dfs.mkdirs(new Path(archivePath));
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.archiveDir", archivePath);
+    assertEquals(0, Helpers.listAllFiles(archivePath).size());
+    testDFSSourceCleanUp(props);
+    // archive dir should contain source files
+    assertEquals(1, Helpers.listAllFiles(archivePath).size());
+  }
+
+  private void testDFSSourceCleanUp(TypedProperties props) throws Exception {
+    // since each source cleanup test will modify source, we need to set different dfs.root each test
+    final String dfsRoot = dfsBasePath + "/test_dfs_cleanup_source" + testNum;
+    dfs.mkdirs(new Path(dfsRoot));
+    prepareParquetDFSFiles(dfsRoot + "/1.parquet", PARQUET_NUM_RECORDS);
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    prepareParquetDFSSource(false, false, props);
+
+    String tableBasePath = dfsBasePath + "/test_dfs_cleanup_output" + testNum;

Review comment:
       Can we use diff path for table and dfsRoot. looks like you use the same. just to avoid confusion.

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -965,14 +965,13 @@ public void testDistributedTestDataSource() {
     assertEquals(1000, c);
   }
 
-  private static void prepareParquetDFSFiles(int numRecords) throws IOException {
-    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+  private static void prepareParquetDFSFiles(String path, int numRecords) throws IOException {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     Helpers.saveParquetToDFS(Helpers.toGenericRecords(
         dataGenerator.generateInserts("000", numRecords)), new Path(path));
   }
 
-  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
+  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, TypedProperties props) throws IOException {

Review comment:
       may be you can name the new arg as additionalProps.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-961587498






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862054362


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b845e34) into [master](https://codecov.io/gh/apache/hudi/commit/673d62f3c3ab07abb3fcd319607e657339bc0682?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (673d62f) will **increase** coverage by `41.65%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2210       +/-   ##
   =============================================
   + Coverage      8.43%   50.09%   +41.65%     
   - Complexity       62     3091     +3029     
   =============================================
     Files            70      386      +316     
     Lines          2880    18953    +16073     
     Branches        359     1977     +1618     
   =============================================
   + Hits            243     9494     +9251     
   - Misses         2616     8661     +6045     
   - Partials         21      798      +777     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `39.95% <ø> (?)` | |
   | hudiclient | `∅ <ø> (∅)` | |
   | hudicommon | `48.20% <ø> (?)` | |
   | hudiflink | `60.73% <ø> (?)` | |
   | hudihadoopmr | `51.34% <ø> (?)` | |
   | hudisync | `?` | |
   | hudiutilities | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...callback/kafka/HoodieWriteCommitKafkaCallback.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2NhbGxiYWNrL2thZmthL0hvb2RpZVdyaXRlQ29tbWl0S2Fma2FDYWxsYmFjay5qYXZh) | | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | | |
   | [.../apache/hudi/hive/MultiPartKeysValueExtractor.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvTXVsdGlQYXJ0S2V5c1ZhbHVlRXh0cmFjdG9yLmphdmE=) | | |
   | [...alCheckpointFromAnotherHoodieTimelineProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2NoZWNrcG9pbnRpbmcvSW5pdGlhbENoZWNrcG9pbnRGcm9tQW5vdGhlckhvb2RpZVRpbWVsaW5lUHJvdmlkZXIuamF2YQ==) | | |
   | [...ties/exception/HoodieIncrementalPullException.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2V4Y2VwdGlvbi9Ib29kaWVJbmNyZW1lbnRhbFB1bGxFeGNlcHRpb24uamF2YQ==) | | |
   | [...ache/hudi/hive/HiveMetastoreBasedLockProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvSGl2ZU1ldGFzdG9yZUJhc2VkTG9ja1Byb3ZpZGVyLmphdmE=) | | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | | |
   | [...ties/deltastreamer/HoodieDeltaStreamerMetrics.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllRGVsdGFTdHJlYW1lck1ldHJpY3MuamF2YQ==) | | |
   | [.../hudi/utilities/schema/SparkAvroPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TcGFya0F2cm9Qb3N0UHJvY2Vzc29yLmphdmE=) | | |
   | [...he/hudi/utilities/transform/AWSDmsTransformer.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3RyYW5zZm9ybS9BV1NEbXNUcmFuc2Zvcm1lci5qYXZh) | | |
   | ... and [438 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-770377916


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc) (14f757a) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **decrease** coverage by `5.39%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2210      +/-   ##
   ============================================
   - Coverage     49.78%   44.38%   -5.40%     
   + Complexity     3089     2810     -279     
   ============================================
     Files           430      431       +1     
     Lines         19566    19681     +115     
     Branches       2004     2016      +12     
   ============================================
   - Hits           9741     8736    -1005     
   - Misses         9033    10262    +1229     
   + Partials        792      683     -109     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.51% <ø> (+0.02%)` | `0.00 <ø> (ø)` | |
   | hudiflink | `33.03% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.51% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `9.14% <0.00%> (-52.73%)` | `0.00 <0.00> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `0.00% <0.00%> (-70.51%)` | `0.00 <0.00> (-50.00)` | |
   | [...i/utilities/deltastreamer/SourceFormatAdapter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvU291cmNlRm9ybWF0QWRhcHRlci5qYXZh) | `0.00% <0.00%> (-86.49%)` | `0.00 <0.00> (-11.00)` | |
   | [...g/apache/hudi/utilities/sources/AvroDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0RGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00 <0.00> (-10.00)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00 <0.00> (-4.00)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00 <0.00> (-5.00)` | |
   | [...java/org/apache/hudi/utilities/sources/Source.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU291cmNlLmphdmE=) | `0.00% <0.00%> (-87.50%)` | `0.00 <0.00> (-5.00)` | |
   | [...udi/utilities/sources/helpers/DFSPathSelector.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9ERlNQYXRoU2VsZWN0b3IuamF2YQ==) | `0.00% <0.00%> (-84.79%)` | `0.00 <0.00> (-15.00)` | |
   | [...i/utilities/sources/helpers/FileSourceCleaner.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9GaWxlU291cmNlQ2xlYW5lci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | ... and [37 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-770377916


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc) (03c6d41) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `20.38%`.
   > The diff coverage is `81.65%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2210       +/-   ##
   =============================================
   + Coverage     49.78%   70.16%   +20.38%     
   + Complexity     3089      378     -2711     
   =============================================
     Files           430       54      -376     
     Lines         19566     2038    -17528     
     Branches       2004      240     -1764     
   =============================================
   - Hits           9741     1430     -8311     
   + Misses         9033      470     -8563     
   + Partials        792      138      -654     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `70.16% <81.65%> (+8.30%)` | `0.00 <21.00> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...g/apache/hudi/utilities/sources/AvroDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0RGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...i/utilities/sources/helpers/FileSourceCleaner.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9GaWxlU291cmNlQ2xlYW5lci5qYXZh) | `83.58% <83.58%> (ø)` | `9.00 <9.00> (?)` | |
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.60% <100.00%> (+0.10%)` | `50.00 <0.00> (ø)` | |
   | [...i/utilities/deltastreamer/SourceFormatAdapter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvU291cmNlRm9ybWF0QWRhcHRlci5qYXZh) | `87.17% <100.00%> (+0.69%)` | `12.00 <1.00> (+1.00)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `100.00% <100.00%> (ø)` | `13.00 <3.00> (+3.00)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `7.00 <3.00> (+3.00)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `8.00 <3.00> (+3.00)` | |
   | [...java/org/apache/hudi/utilities/sources/Source.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU291cmNlLmphdmE=) | `88.23% <100.00%> (+0.73%)` | `6.00 <1.00> (+1.00)` | |
   | [...udi/utilities/sources/helpers/DFSPathSelector.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9ERlNQYXRoU2VsZWN0b3IuamF2YQ==) | `85.10% <100.00%> (+0.32%)` | `16.00 <1.00> (+1.00)` | |
   | [.../java/org/apache/hudi/common/util/HoodieTimer.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvSG9vZGllVGltZXIuamF2YQ==) | | | |
   | ... and [382 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b845e34d11e4e44e2b41e2089349baddc3a10b80 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-1061530216


   @hotienvu still working on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-773247045






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567443840



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {

Review comment:
       Yes it will throw (and log) inside the cleanTask abstract method if failed to remove or rename the file. In case of a directory, remove will throw  since we set fs.delete(..., false)). Archive a directory might be an edge case since it may also contains files to be archived. I will update the logic to exclude directory from the clean up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166",
       "triggerID" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668) 
   * 67dabdd51934a7141a299114de2b836b1f016fd5 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-768487034


   @hotienvu Still working on this? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b845e34d11e4e44e2b41e2089349baddc3a10b80 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567403920



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
##########
@@ -125,4 +134,12 @@ public CsvDFSSource(TypedProperties props,
       return Option.empty();
     }
   }
+
+  @Override

Review comment:
       Agreed this is not desirable. However it  is quite tricky since the current Source base classes are primarily based on the input format (Row vs String vs Avro) instead of the actual source (DFS vs Kafka, etc).  Same problem with the repeated pathSelector for each DFS Sources. Some refactoring may be required in order to abstract this cleanly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r570157010



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));

Review comment:
       thanks for clarifying. actually we can leave it as is. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567457990



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));

Review comment:
       Hmmm turned out you can't initialize a Path with "". Would `new Path(paths.isEmpty() ? "." : String.join("/", paths))` be ok still? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter removed a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-commenter removed a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862054362


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b845e34) into [master](https://codecov.io/gh/apache/hudi/commit/673d62f3c3ab07abb3fcd319607e657339bc0682?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (673d62f) will **increase** coverage by `44.07%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2210       +/-   ##
   =============================================
   + Coverage      8.43%   52.51%   +44.07%     
   - Complexity       62     3664     +3602     
   =============================================
     Files            70      474      +404     
     Lines          2880    23997    +21117     
     Branches        359     2741     +2382     
   =============================================
   + Hits            243    12601    +12358     
   - Misses         2616    10137     +7521     
   - Partials         21     1259     +1238     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `39.95% <ø> (?)` | |
   | hudiclient | `∅ <ø> (∅)` | |
   | hudicommon | `48.20% <ø> (?)` | |
   | hudiflink | `60.73% <ø> (?)` | |
   | hudihadoopmr | `51.34% <ø> (?)` | |
   | hudisparkdatasource | `66.47% <ø> (?)` | |
   | hudisync | `46.79% <ø> (+40.00%)` | :arrow_up: |
   | huditimelineservice | `64.36% <ø> (?)` | |
   | hudiutilities | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache/hudi/hive/HiveMetastoreBasedLockProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvSGl2ZU1ldGFzdG9yZUJhc2VkTG9ja1Byb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (-60.22%)` | :arrow_down: |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | | |
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | | |
   | [...a/org/apache/hudi/utilities/sources/SqlSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU3FsU291cmNlLmphdmE=) | | |
   | [...s/exception/HoodieIncrementalPullSQLException.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2V4Y2VwdGlvbi9Ib29kaWVJbmNyZW1lbnRhbFB1bGxTUUxFeGNlcHRpb24uamF2YQ==) | | |
   | [...che/hudi/utilities/schema/SchemaPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQb3N0UHJvY2Vzc29yLmphdmE=) | | |
   | [.../utilities/transform/SqlQueryBasedTransformer.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3RyYW5zZm9ybS9TcWxRdWVyeUJhc2VkVHJhbnNmb3JtZXIuamF2YQ==) | | |
   | [...g/apache/hudi/utilities/schema/SchemaProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlci5qYXZh) | | |
   | [.../hudi/utilities/schema/RowBasedSchemaProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9Sb3dCYXNlZFNjaGVtYVByb3ZpZGVyLmphdmE=) | | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | | |
   | ... and [512 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r565759271



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());

Review comment:
       minor: may be we can name this as "archived_file"

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);

Review comment:
       if(!fs.exists(newDir)) {
      fs.mkdirs(newDir);
   }

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
##########
@@ -125,4 +134,12 @@ public CsvDFSSource(TypedProperties props,
       return Option.empty();
     }
   }
+
+  @Override

Review comment:
       I understand atleast for kafka it may not be applicable. but since we repeat this for almost all sources, can we move this to Source class itself. All impls just need to update inputFiles list alone. rest will be taken care by Source.postCommit(). this also means, that you need to declare fileCleaner and inputList in Source class. 
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));
+    }
+  }
+
+  private static class FileSourceCleanerNoOp extends FileSourceCleaner {

Review comment:
       @vinothchandar / @bvaradar : Whats good way to go about no-op impl in general. Can we leave fileCleaner as null and call into fileCleaner. cleanTask() only if non-null from the callers. Or have this class, and don't do anything within cleanTask(). 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,

Review comment:
       I know its self explanatory. but one line java doc would be nice. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;

Review comment:
       if fileSystem is used in all cleaner impls, might as well declare it in base class. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));
+    }
+  }
+
+  private static class FileSourceCleanerNoOp extends FileSourceCleaner {

Review comment:
       what do you think of "FileSourceNoOpCleaner" instead of FileSourceCleanerNoOp? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));
+    }
+  }
+
+  private static class FileSourceCleanerNoOp extends FileSourceCleaner {
+    protected FileSourceCleanerNoOp(TypedProperties props) {
+      super(props);
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info("No hoodie.deltastreamer.source.dfs.clean was specified. Leaving source unchanged.");

Review comment:
       do we need this info logging. I feel we can leave it empty

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {

Review comment:
       do you think it is unwarranted to check if "file" is actually a file and not a dir. if dir, throw exception or something? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {

Review comment:
       java docs

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));

Review comment:
       if file to be archived is in source root dir itself, this moves it to "archive_folder/./" right. May I know the rational ? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {

Review comment:
       just trying to understand. Can cleaning be applicable only for files? thinking if we should name this SourceCleaner instead of fileSourceCleaner and have 3 cleaners for file deletions, file archiving and a no op one. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567407991



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));
+    }
+  }
+
+  private static class FileSourceCleanerNoOp extends FileSourceCleaner {
+    protected FileSourceCleanerNoOp(TypedProperties props) {
+      super(props);
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info("No hoodie.deltastreamer.source.dfs.clean was specified. Leaving source unchanged.");

Review comment:
       I personally think it helps with debugging. Unless there's a reason not to. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567428447



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));

Review comment:
       yes, but trying to understand why move it to "archive_folder/./" folder rather than "archive_folder/". can you please help me understand.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-769038357


   @pratyakshsharma @nsivabalan  thanks for the feedback, I will resolve it as early as I could 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-773247045


   @hotienvu : is there any comment to be addressed? If everything is good, I can go ahead and land it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567407168



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    DELETE,
+    ARCHIVE,
+    OFF
+  }
+
+  private final Option<ExecutorService> cleanerPool;
+
+  protected FileSourceCleaner(TypedProperties props) {
+    int numCleanerThreads = props.getInteger(Config.FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY,
+        Config.DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL);
+    cleanerPool = (numCleanerThreads > 0) ? Option.of(Executors.newFixedThreadPool(numCleanerThreads)) : Option.empty();
+  }
+
+  /**
+   * Factory method to create FileSourceCleaner based on properties.
+   */
+  public static FileSourceCleaner create(TypedProperties props, FileSystem fs) {
+    final String cleanMode = props.getString(Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY, Config.DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY);
+    switch (CleanMode.valueOf(cleanMode.toUpperCase())) {
+      case DELETE:
+        return new FileSourceRemover(props, fs);
+      case ARCHIVE:
+        return new FileSourceArchiver(props, fs);
+      case OFF:
+        return new FileSourceCleanerNoOp(props);
+      default:
+        throw new IllegalArgumentException(String.format("Unknown option %s for %s. Available options are: "
+            + "delete, archive, off(default)", cleanMode, Config.FILE_SOURCE_CLEAN_MODE_OPT_KEY));
+    }
+  }
+
+  /**
+   * Clean up a file that has been ingested successfully.
+   */
+  public void clean(String file) {
+    if (cleanerPool.isPresent()) {
+      cleanerPool.get().submit(() -> cleanTask(file));
+    } else {
+      cleanTask(file);
+    }
+  }
+
+  abstract void cleanTask(String file);
+
+  private static class FileSourceRemover extends FileSourceCleaner {
+    private final FileSystem fs;
+    public FileSourceRemover(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      LOG.info(String.format("Removing %s...", file));
+      try {
+        if (fs.delete(new Path(file), false)) {
+          LOG.info(String.format("Successfully remove up %s", file));
+        } else {
+          LOG.warn(String.format("Failed to remove %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to remove %s", file), e);
+      }
+    }
+  }
+
+  private static class FileSourceArchiver extends FileSourceCleaner {
+    private final FileSystem fs;
+    private final Path archiveDir;
+    private final Path sourceRootDir;
+
+    public FileSourceArchiver(TypedProperties props, FileSystem fs) {
+      super(props);
+      this.fs = fs;
+      this.archiveDir = new Path(props.getString(Config.FILE_SOURCE_ARCHIVE_DIR_KEY));
+      this.sourceRootDir = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+      ValidationUtils.checkArgument(!isSubDir(archiveDir, sourceRootDir),
+          String.format("%s must not be child of %s", Config.FILE_SOURCE_ARCHIVE_DIR_KEY, ROOT_INPUT_PATH_PROP));
+    }
+
+    private boolean isSubDir(Path childDir, Path parentDir) {
+      while (childDir != null) {
+        if (childDir.equals(parentDir)) {
+          return true;
+        }
+        childDir = childDir.getParent();
+      }
+      return false;
+    }
+
+    @Override
+    void cleanTask(String file) {
+      try {
+        final Path original = new Path(file);
+        final Path fileDir = original.getParent();
+        Path relativeDir = getRelativeDir(fileDir, sourceRootDir);
+        final Path newDir = new Path(archiveDir, relativeDir);
+        LOG.info("Creating directory if not existent: " + newDir.toString());
+        fs.mkdirs(newDir);
+
+        final Path newFile = new Path(newDir, original.getName());
+        LOG.info(String.format("Renaming: %s to %s", original.toString(), newFile));
+        if (fs.rename(original, newFile)) {
+          LOG.info(String.format("Successfully archive %s", file));
+        } else {
+          LOG.warn(String.format("Failed to archive %s", file));
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Failed to archive %s", file), e);
+      }
+    }
+
+    private Path getRelativeDir(Path childPath, Path parentPath) {
+      LinkedList<String> paths = new LinkedList<>();
+      while (childPath != null && !childPath.equals(parentPath)) {
+        paths.addFirst(childPath.getName());
+        childPath = childPath.getParent();
+      }
+      return new Path(paths.isEmpty() ? "." : String.join("/", paths));

Review comment:
       The files are archived to the new location with relative to the source root directory. e.g. src/foo/x => archive/foo/x etc. So yes if a file is in the source root dir itself it will be moved to the archive root dir.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-961587498


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166",
       "triggerID" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67dabdd51934a7141a299114de2b836b1f016fd5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot removed a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot removed a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166",
       "triggerID" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67dabdd51934a7141a299114de2b836b1f016fd5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] xushiyan commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r734982758



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * This class provides various clean-up strategies for DeltaStreamer when reading from DFS file sources.
+ * Each <code>*DFSSource</code> may invoke this to clean up/archive files after each successful commit.
+ *
+ */
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    /**
+     * Remove source files after each successfully commit.
+     */
+    DELETE,
+    /**
+     * Move source files to specified archive directory after each successful commit.
+     * Used in conjunction with <code>hoodie.deltastreamer.source.dfs.clean.archiveDir</code>
+     */
+    ARCHIVE,
+    /**
+     * Default option. Do not clean up source files.
+     */
+    OFF
+  }

Review comment:
       @hotienvu @nsivabalan @vinothchandar I have a different view on supporting these features: we need to be extremely cautious when dealing with users' data. So I would imagine it's the worst case when an unintended config in Hudi could lead to deleting users' data.
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
##########
@@ -79,6 +79,15 @@ protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSess
         : new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), overriddenSchemaProvider);
   }
 
+  /**
+   * Called after a new batch is committed successfully. Can be used to clean up source if necessary.
+   *
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public void postCommit() {
+    // no-op
+  }
+

Review comment:
       big +1 to this API, definitely comes in handy. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+/**
+ * This class provides various clean-up strategies for DeltaStreamer when reading from DFS file sources.
+ * Each <code>*DFSSource</code> may invoke this to clean up/archive files after each successful commit.
+ *
+ */
+public abstract class FileSourceCleaner {
+  private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class);
+
+  /**
+   * Configs supported.
+   */
+  public static class Config {
+    private Config() {}
+
+    public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode";
+    public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name();
+
+    public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads";
+    public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1;
+
+    public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir";
+  }
+
+  private enum CleanMode {
+    /**
+     * Remove source files after each successfully commit.
+     */
+    DELETE,
+    /**
+     * Move source files to specified archive directory after each successful commit.
+     * Used in conjunction with <code>hoodie.deltastreamer.source.dfs.clean.archiveDir</code>
+     */
+    ARCHIVE,
+    /**
+     * Default option. Do not clean up source files.
+     */
+    OFF
+  }

Review comment:
       I do think the postCommit() API is very useful. I'd suggest to provide an abstract class say `SourcePostCommitAction` to be extended by users. In hudi we may just support default no-op implementation. Users are free to implement clean-up, archive, posting metrics, etc. We just need to instantiate user-defined classes via hoodie config, like Transformer or Payload classes.
   
   I also think defining a bunch of metrics for DFSSource is helpful, like num of files picked up, total size of files, file scanning duration, etc. To provide visibility to users when dealing with DFSSource impl. issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-773247778


   Also, I have triggered Travis CI again. guess there is some flaky failure. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166",
       "triggerID" : "67dabdd51934a7141a299114de2b836b1f016fd5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 67dabdd51934a7141a299114de2b836b1f016fd5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=2166) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b845e34d11e4e44e2b41e2089349baddc3a10b80 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210) 
   * a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=1668) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-770377916


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc) (7338e25) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `0.92%`.
   > The diff coverage is `79.64%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2210      +/-   ##
   ============================================
   + Coverage     49.78%   50.70%   +0.92%     
   - Complexity     3089     3140      +51     
   ============================================
     Files           430      431       +1     
     Lines         19566    19678     +112     
     Branches       2004     2015      +11     
   ============================================
   + Hits           9741     9978     +237     
   + Misses         9033     8890     -143     
   - Partials        792      810      +18     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiflink | `33.03% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.51% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `66.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiutilities | `70.07% <79.64%> (+8.21%)` | `0.00 <21.00> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...g/apache/hudi/utilities/sources/AvroDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0RGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...i/utilities/sources/helpers/FileSourceCleaner.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9GaWxlU291cmNlQ2xlYW5lci5qYXZh) | `80.28% <80.28%> (ø)` | `9.00 <9.00> (?)` | |
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.60% <100.00%> (+0.10%)` | `50.00 <0.00> (ø)` | |
   | [...i/utilities/deltastreamer/SourceFormatAdapter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvU291cmNlRm9ybWF0QWRhcHRlci5qYXZh) | `87.17% <100.00%> (+0.69%)` | `12.00 <1.00> (+1.00)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `100.00% <100.00%> (ø)` | `13.00 <3.00> (+3.00)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `7.00 <3.00> (+3.00)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `8.00 <3.00> (+3.00)` | |
   | [...java/org/apache/hudi/utilities/sources/Source.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU291cmNlLmphdmE=) | `88.23% <100.00%> (+0.73%)` | `6.00 <1.00> (+1.00)` | |
   | [...udi/utilities/sources/helpers/DFSPathSelector.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9ERlNQYXRoU2VsZWN0b3IuamF2YQ==) | `85.10% <100.00%> (+0.32%)` | `16.00 <1.00> (+1.00)` | |
   | ... and [5 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862054362


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b845e34) into [master](https://codecov.io/gh/apache/hudi/commit/673d62f3c3ab07abb3fcd319607e657339bc0682?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (673d62f) will **increase** coverage by `44.07%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2210       +/-   ##
   =============================================
   + Coverage      8.43%   52.51%   +44.07%     
   - Complexity       62     3664     +3602     
   =============================================
     Files            70      474      +404     
     Lines          2880    23997    +21117     
     Branches        359     2741     +2382     
   =============================================
   + Hits            243    12601    +12358     
   - Misses         2616    10137     +7521     
   - Partials         21     1259     +1238     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | hudicli | `39.95% <ø> (?)` | |
   | hudiclient | `∅ <ø> (∅)` | |
   | hudicommon | `48.20% <ø> (?)` | |
   | hudiflink | `60.73% <ø> (?)` | |
   | hudihadoopmr | `51.34% <ø> (?)` | |
   | hudisparkdatasource | `66.47% <ø> (?)` | |
   | hudisync | `46.79% <ø> (+40.00%)` | :arrow_up: |
   | huditimelineservice | `64.36% <ø> (?)` | |
   | hudiutilities | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache/hudi/hive/HiveMetastoreBasedLockProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS1zeW5jL2h1ZGktaGl2ZS1zeW5jL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL2hpdmUvSGl2ZU1ldGFzdG9yZUJhc2VkTG9ja1Byb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (-60.22%)` | :arrow_down: |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | | |
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | | |
   | [...a/org/apache/hudi/utilities/sources/SqlSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU3FsU291cmNlLmphdmE=) | | |
   | [...s/exception/HoodieIncrementalPullSQLException.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2V4Y2VwdGlvbi9Ib29kaWVJbmNyZW1lbnRhbFB1bGxTUUxFeGNlcHRpb24uamF2YQ==) | | |
   | [...che/hudi/utilities/schema/SchemaPostProcessor.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQb3N0UHJvY2Vzc29yLmphdmE=) | | |
   | [.../utilities/transform/SqlQueryBasedTransformer.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3RyYW5zZm9ybS9TcWxRdWVyeUJhc2VkVHJhbnNmb3JtZXIuamF2YQ==) | | |
   | [...g/apache/hudi/utilities/schema/SchemaProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFQcm92aWRlci5qYXZh) | | |
   | [.../hudi/utilities/schema/RowBasedSchemaProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9Sb3dCYXNlZFNjaGVtYVByb3ZpZGVyLmphdmE=) | | |
   | [...apache/hudi/utilities/sources/JsonKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkthZmthU291cmNlLmphdmE=) | | |
   | ... and [512 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hotienvu commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hotienvu commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-773320271


   @nsivabalan  there is some timeout integration test, I tried to dig deeper but haven't succeed so far. I'm a bit busy this week so will try to look again this weekend. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-784223991


   Update: I couldn't reproduce the CI failure locally. I need some more time to look into it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-770377916


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc) (7338e25) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `0.60%`.
   > The diff coverage is `79.64%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2210      +/-   ##
   ============================================
   + Coverage     49.78%   50.39%   +0.60%     
   + Complexity     3089     3080       -9     
   ============================================
     Files           430      425       -5     
     Lines         19566    19293     -273     
     Branches       2004     1998       -6     
   ============================================
   - Hits           9741     9722      -19     
   + Misses         9033     8775     -258     
   - Partials        792      796       +4     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `37.21% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiclient | `100.00% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudicommon | `51.49% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudiflink | `33.03% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudihadoopmr | `33.16% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisparkdatasource | `69.51% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | hudisync | `48.61% <ø> (ø)` | `0.00 <ø> (ø)` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `70.07% <79.64%> (+8.21%)` | `0.00 <21.00> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...g/apache/hudi/utilities/sources/AvroDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0RGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...i/utilities/sources/helpers/FileSourceCleaner.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9GaWxlU291cmNlQ2xlYW5lci5qYXZh) | `80.28% <80.28%> (ø)` | `9.00 <9.00> (?)` | |
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.60% <100.00%> (+0.10%)` | `50.00 <0.00> (ø)` | |
   | [...i/utilities/deltastreamer/SourceFormatAdapter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvU291cmNlRm9ybWF0QWRhcHRlci5qYXZh) | `87.17% <100.00%> (+0.69%)` | `12.00 <1.00> (+1.00)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `100.00% <100.00%> (ø)` | `13.00 <3.00> (+3.00)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `7.00 <3.00> (+3.00)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `8.00 <3.00> (+3.00)` | |
   | [...java/org/apache/hudi/utilities/sources/Source.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU291cmNlLmphdmE=) | `88.23% <100.00%> (+0.73%)` | `6.00 <1.00> (+1.00)` | |
   | [...udi/utilities/sources/helpers/DFSPathSelector.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9ERlNQYXRoU2VsZWN0b3IuamF2YQ==) | `85.10% <100.00%> (+0.32%)` | `16.00 <1.00> (+1.00)` | |
   | [...udi/timeline/service/handlers/BaseFileHandler.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS10aW1lbGluZS1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9odWRpL3RpbWVsaW5lL3NlcnZpY2UvaGFuZGxlcnMvQmFzZUZpbGVIYW5kbGVyLmphdmE=) | | | |
   | ... and [11 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r567427913



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {

Review comment:
       Sorry, was this addressed ? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
##########
@@ -125,4 +134,12 @@ public CsvDFSSource(TypedProperties props,
       return Option.empty();
     }
   }
+
+  @Override

Review comment:
       got it, thanks. Can you please file a jira and assign it to me. (shivnarayan is my handle)

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+
+public abstract class FileSourceCleaner {

Review comment:
       got it, thanks. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-770377916


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=h1) Report
   > Merging [#2210](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=desc) (511231c) into [master](https://codecov.io/gh/apache/hudi/commit/5d053b495b8cb44cce88a67e82cbdfdc3d8b3180?el=desc) (5d053b4) will **increase** coverage by `20.38%`.
   > The diff coverage is `81.65%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2210/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #2210       +/-   ##
   =============================================
   + Coverage     49.78%   70.16%   +20.38%     
   + Complexity     3089      378     -2711     
   =============================================
     Files           430       54      -376     
     Lines         19566     2038    -17528     
     Branches       2004      240     -1764     
   =============================================
   - Hits           9741     1430     -8311     
   + Misses         9033      470     -8563     
   + Partials        792      138      -654     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `70.16% <81.65%> (+8.30%)` | `0.00 <21.00> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2210?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...g/apache/hudi/utilities/sources/AvroDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0RGU1NvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...i/utilities/sources/helpers/FileSourceCleaner.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9GaWxlU291cmNlQ2xlYW5lci5qYXZh) | `83.58% <83.58%> (ø)` | `9.00 <9.00> (?)` | |
   | [...apache/hudi/utilities/deltastreamer/DeltaSync.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvRGVsdGFTeW5jLmphdmE=) | `70.60% <100.00%> (+0.10%)` | `50.00 <0.00> (ø)` | |
   | [...i/utilities/deltastreamer/SourceFormatAdapter.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvU291cmNlRm9ybWF0QWRhcHRlci5qYXZh) | `87.17% <100.00%> (+0.69%)` | `12.00 <1.00> (+1.00)` | |
   | [...rg/apache/hudi/utilities/sources/CsvDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQ3N2REZTU291cmNlLmphdmE=) | `100.00% <100.00%> (ø)` | `13.00 <3.00> (+3.00)` | |
   | [...g/apache/hudi/utilities/sources/JsonDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvbkRGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `7.00 <3.00> (+3.00)` | |
   | [...pache/hudi/utilities/sources/ParquetDFSSource.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUGFycXVldERGU1NvdXJjZS5qYXZh) | `100.00% <100.00%> (ø)` | `8.00 <3.00> (+3.00)` | |
   | [...java/org/apache/hudi/utilities/sources/Source.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvU291cmNlLmphdmE=) | `88.23% <100.00%> (+0.73%)` | `6.00 <1.00> (+1.00)` | |
   | [...udi/utilities/sources/helpers/DFSPathSelector.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9ERlNQYXRoU2VsZWN0b3IuamF2YQ==) | `85.10% <100.00%> (+0.32%)` | `16.00 <1.00> (+1.00)` | |
   | [...apache/hudi/cli/HoodieHistoryFileNameProvider.java](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree#diff-aHVkaS1jbGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpL0hvb2RpZUhpc3RvcnlGaWxlTmFtZVByb3ZpZGVyLmphdmE=) | | | |
   | ... and [382 more](https://codecov.io/gh/apache/hudi/pull/2210/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2210: [HUDI-1348] Provide option to clean up DFS sources

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#issuecomment-862028641


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210",
       "triggerID" : "b845e34d11e4e44e2b41e2089349baddc3a10b80",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b845e34d11e4e44e2b41e2089349baddc3a10b80 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=210) 
   * a174c4ed2b4c13a032a38afdb0a21b58a7b6cf25 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org