You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by HeartSaVioR <gi...@git.apache.org> on 2018/11/05 22:18:31 UTC

[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/spark/pull/22952

    [SPARK-20568][SS] Rename files which are completed in previous batch

    ## What changes were proposed in this pull request?
    
    This patch adds the option to rename files which are completed in previous batch, so that end users can clean up processed files to save their storage.
    
    It is only applied to "micro-batch", since for batch all input files must be kept to get same result across multiple query executions.
    
    ## How was this patch tested?
    
    Added UT, manually tested with Mac local. (The logic is very simple so not sure we need to verify with HDFS manually.)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/spark SPARK-20568

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22952.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22952
    
----
commit 8a1d2e187c667833b2de8eb4cba2fa04dca9c6ff
Author: Jungtaek Lim <ka...@...>
Date:   2018-11-05T04:32:51Z

    SPARK-20568 Rename files which are completed in previous batch

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99855 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99855/testReport)** for PR 22952 at commit [`998e769`](https://github.com/apache/spark/commit/998e769c3b552c39736af7814f60be895dbd90d4).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class FileStreamSourceCleaner(fileSystem: FileSystem, sourcePath: Path,`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r232824087
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
    --- End diff --
    
    I'd drop s3n  & s3 refs as they have gone from deprecated to deceased


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99450 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99450/testReport)** for PR 22952 at commit [`f59c35a`](https://github.com/apache/spark/commit/f59c35aad5a0c2b436fe36c3116cce532fa0beee).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @gaborgsomogyi @steveloughran 
    OK. I'll change the approach to just check against final path for each moving. As @steveloughran stated, it may bring performance hit for each checking when dealing with object stores, so we may also need to provide a way to disable checking as well with caution. (Btw, if moving file in object store requires huge overhead rather than globing, slow globing may not be a big deal.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @gaborgsomogyi @steveloughran 
    Please take a look at 17b9b9a043ead0d448048c88b30f544228bd230b which just leverages GlobFilter. You may find that when the depth of archive path is more than 2, there's no chance for final destination to be picked up from FileStreamSource: so most of usual cases overlap will not happen, as well as Spark can determine this as only comparing depths.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @gaborgsomogyi 
    That's really huge... Could you share how you tested? Like which FS (local/HDFS/S3/etc), directory structure, count of files... That would help me understanding the impact and also help on testing manually when we deal with optimization.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98917/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR 
    I've taken a look at the possibilities:
    * [GlobExpander](https://github.com/apache/hadoop/blob/a55d6bba71c81c1c4e9d8cd11f55c78f10a548b0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobExpander.java#L63) is private
    * `globStatus` recursively is not an option because of it's poor performance
    * `globStatus` with limited scope can be an option but there are cases where it might take some execution time
    * Print warnings and not moving files is an option which seems feasible



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99450 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99450/testReport)** for PR 22952 at commit [`f59c35a`](https://github.com/apache/spark/commit/f59c35aad5a0c2b436fe36c3116cce532fa0beee).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237342362
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    --- End diff --
    
    Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98919 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98919/testReport)** for PR 22952 at commit [`ca26b41`](https://github.com/apache/spark/commit/ca26b4136adc09fb9015c973953b50d894fc8779).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r231717554
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    Totally agreed, and that matches the option 3 I've proposed. And option 1 would not affect much on critical path in a batch since rename operations will be enqueued and background thread will take care.
    
    For option 1, guaranteeing makes the thing being complicated. If we are OK to NOT guarantee that all processed files are renamed, we can take the renaming in background (like option 1) without handling backpressure, and simply drop the requests in queue with logging if the size is beyond the threshold or JVM is shutting down.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237341425
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    --- End diff --
    
    Yeah, I guess the patch prevents the case if it works like my expectation, but I'm also in favor of defensive programming and logging would be better for end users. Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r235632809
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +258,64 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      if (!fs.exists(newPath.getParent)) {
    --- End diff --
    
    Nice finding. Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    > HDFS does not support it yet, though on the way, see https://issues.apache.org/jira/browse/HADOOP-10019
    
    That's an old patch; I don't know of any active dev there.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99855 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99855/testReport)** for PR 22952 at commit [`998e769`](https://github.com/apache/spark/commit/998e769c3b552c39736af7814f60be895dbd90d4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR It was tested with S3 and the trick is to have HUGE amount of files. Listing files is pathologically bad as @steveloughran stated, glob is even worse.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237314459
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <code>cleanSource</code>: option to clean up completed files after processing.<br/>
    +        Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op".<br/>
    +        When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.<br/>
    +        Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"<br/>
    +        NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation.<br/>
    +        NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries.
    --- End diff --
    
    NOTE 2: The source path should not be used from multiple **sources or** queries when enabling this option, because source files will be moved or deleted which behavior may impact the other **sources and** queries.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r235314493
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +258,64 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      if (!fs.exists(newPath.getParent)) {
    --- End diff --
    
    These fs operation can also throw exception. Why not covered these as well with try?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98491 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98491/testReport)** for PR 22952 at commit [`8a1d2e1`](https://github.com/apache/spark/commit/8a1d2e187c667833b2de8eb4cba2fa04dca9c6ff).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98917 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98917/testReport)** for PR 22952 at commit [`3f6b5fb`](https://github.com/apache/spark/commit/3f6b5fbf01b2e78dfc9ecf7e3b45ef771fec74a7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99851 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99851/testReport)** for PR 22952 at commit [`22dce0c`](https://github.com/apache/spark/commit/22dce0c1d17de360c0d887a0352c1e8f57761db7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98918 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98918/testReport)** for PR 22952 at commit [`33c5681`](https://github.com/apache/spark/commit/33c5681ab022116133576e4e27c50e346c1ffba9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99855/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237315173
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -100,6 +101,36 @@ class FileStreamSource(
     
       logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
     
    +  ensureNoOverlapBetweenSourceAndArchivePath()
    --- End diff --
    
    Could you do this check only when CleanSourceMode is `ARCHIVE`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237320515
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -100,6 +101,36 @@ class FileStreamSource(
     
       logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
     
    +  ensureNoOverlapBetweenSourceAndArchivePath()
    +
    +  private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = {
    +    @tailrec
    +    def removeGlob(path: Path): Path = {
    +      if (path.getName.contains("*")) {
    +        removeGlob(path.getParent)
    +      } else {
    +        path
    +      }
    +    }
    +
    +    sourceOptions.sourceArchiveDir match {
    +      case None =>
    +      case Some(archiveDir) =>
    +        val sourceUri = removeGlob(qualifiedBasePath).toUri
    +        val archiveUri = new Path(archiveDir).toUri
    +
    +        val sourcePath = sourceUri.getPath
    +        val archivePath = archiveUri.getPath
    --- End diff --
    
    we need to use `fs.makeQualified` to turn all user provided paths to absolute paths as the user may just pass a relative path.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237341854
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being stopped
    +          logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
    +      }
    +    }
    +
    +    def remove(entry: FileEntry): Unit = {
    +      val curPath = new Path(entry.path)
    +      try {
    +        logDebug(s"Removing completed file $curPath")
    +        fs.delete(curPath, false)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being stopped
    +          logWarning(s"Fail to remove $curPath / skip removing file.", e)
    +      }
    +    }
    +
    +    val logOffset = FileStreamSourceOffset(end).logOffset
    +    metadataLog.get(logOffset) match {
    --- End diff --
    
    Ah I didn't indicate that. Thanks for letting me know! Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    > Provide additional option: delete (two options - 'rename' / 'delete' - are mutually exclusive)
    > 
    > Actually the actions end users are expected to take are 1. moving to archive directory (with compression or not) 2. delete periodically. If moving/renaming require non-trivial cost, end users may want to just delete files directly without backing up.
    
    +1 for this approach. The file listing cost is huge when the directory has a lot of files. I think one of the goals of this feature is reducing the file listing cost. Hence either delete the files or move to a different directory should be fine. Also could you try to make one simple option for `rename/delete`, such as `cleanSource` -> (`none`, `rename` or `delete`)? When the user picks up `rename`, they should be able to set the archive directory using another option.
    
    In addition, it would be great that we can document that whenever using this option, the same directory should not be used by multiple queries.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98493 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98493/testReport)** for PR 22952 at commit [`fb01c60`](https://github.com/apache/spark/commit/fb01c60624389ee432d0a23afd14e956453cd22e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98491 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98491/testReport)** for PR 22952 at commit [`8a1d2e1`](https://github.com/apache/spark/commit/8a1d2e187c667833b2de8eb4cba2fa04dca9c6ff).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99180 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99180/testReport)** for PR 22952 at commit [`007f5d5`](https://github.com/apache/spark/commit/007f5d53e7a130ae08bc36494a9f04c882e7c711).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @zsxwing @dongjoon-hyun @steveloughran 
    Thanks all for the valuable feedback! I applied review comments.
    
    While I covered the new feature with new UTs, I'm yet to test this manually with HDFS. I'll find the time to do manual test in next week. For cloud storages, TBH, it's not easy for me to do manual test against them, so I'd wish to lean on reviewers' eyes and experiences.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @zsxwing Btw, how do you think about addressing background move/deletion (I had thought and @gaborgsomogyi also suggested as well) into separate issue? I guess putting more feature would let you spend more time to review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99862 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99862/testReport)** for PR 22952 at commit [`998e769`](https://github.com/apache/spark/commit/998e769c3b552c39736af7814f60be895dbd90d4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @gaborgsomogyi @steveloughran 
    `GlobExpander` only looks like handling `{}` pattern. We need to still deal with `*` and `?` which can't be expanded like this. 
    
    It would only work if we would be OK with restricting descendants of multiple paths (for now we restrict descendants of one path), so while it would help fixing the bug of current patch, it might be still too restrictive.
    
    I think changing Hadoop version because of this costs too much. If we really would like to go, only viable solution is copying the code. (Actually we can also just reimplement it since its requirements are like a kind of assignment, though we may end up with similar code.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r231378889
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    Hi, @HeartSaVioR .
    Renaming is expensive in S3, isn't it? I don't worry about HDFS, but do you know if there is potential side effects like performance degradation in the cloud environment, especially with continuous processing mode?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    cc. @zsxwing 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    bq. GlobExpander is private
    
    that's correctable. 
    
    1. Make sure there are standalone tests (if none around)
    1. Make sure that off filesystem.md there's something declaring normatively WTF it expands
    1. Provide a patch moving from @Private  to @Public/Evolving
    
    We can apply those changes to branch-2 and trunk, and, because the class is already public you can use it knowing that in the development line of hadoop we've committed to not moving, deleting or breaking it.
    
    As usual: file a HADOOP-* jira with the patch, link me to it, I'll do my best to review


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237481604
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being stopped
    +          logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
    +      }
    +    }
    +
    +    def remove(entry: FileEntry): Unit = {
    +      val curPath = new Path(entry.path)
    --- End diff --
    
    I just modified existing UT to have space and % in directory name as well as file name.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98493/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98918/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r231634109
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    @HeartSaVioR . Does Flink/Storm have this feature? Or are there JIRA issues? I'm wondering if this is popular in the streaming engines and how they are handling this in the cloud situation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r235632872
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala ---
    @@ -74,6 +76,43 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
        */
       val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
     
    +  /**
    +   * The archive directory to move completed files. The option will be only effective when
    +   * "cleanSource" is set to "archive".
    +   *
    +   * Note that the completed file will be moved to this archive directory with respecting to
    +   * its own path.
    +   *
    +   * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive
    +   * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt".
    +   */
    +  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
    +
    +  /**
    +   * Defines how to clean up completed files. Available options are "archive", "delete", "no_op".
    +   */
    +  val cleanSource: CleanSourceMode.Value = {
    +    val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString)
    +      .toUpperCase(Locale.ROOT)
    +
    +    val matchedModeOpt = CleanSourceMode.values.find(_.toString == modeStrOption)
    +    if (matchedModeOpt.isEmpty) {
    --- End diff --
    
    Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @zsxwing 
    Yeah, it would be ideal we can enforce `archivePath` to which don't have any possibility to match against source path (glob), so my approach was to find directory which is the base directory without having glob in ancestor, and `archive path + base directory of source path` doesn't belong to sub-directory of found directory.
    
    For example, suppose source path is `/a/b/c/*/ef?/*/g/h/*/i`, then base directory of source path would be `/a/b/c`, and `archive path + base directory of source path` should not belong to sub-directory of `/a/b/c`.
    (My code has a bug for finding the directory so need to fix it.)
    
    This is not an elegant approach and the approach has false-positive, ending up restricting the archive path which actually doesn't make overlap (too restrict), but it would guarantee two paths never overlap. (So no need to re-check when renaming file.)
    
    I guess the approach might be reasonable because in practice end users would avoid themselves have to think about complicated case on overlaps, and just isolate two paths.
    
    What do you think about this approach?
    
    cc. @gaborgsomogyi Could you also help validating my approach? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    I'm now also playing with Hadoop glob relevant classes to check whether final destination matches source path glob pattern or not.
    
    * Looks like we can leverage `GlobPattern` but it is marked as `@Private`.
    * `GlobFilter` is `@Public` but it only checks against `path.getName()` so it would only compare with the last component. If we would like to leverage this, we should split all components and compare with multiple filters.
    
    Will update the code and test once I find a viable approach.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99851/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99851 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99851/testReport)** for PR 22952 at commit [`22dce0c`](https://github.com/apache/spark/commit/22dce0c1d17de360c0d887a0352c1e8f57761db7).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class FileStreamSourceCleaner(fileSystem: FileSystem, sourcePath: Path,`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99180/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98917 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98917/testReport)** for PR 22952 at commit [`3f6b5fb`](https://github.com/apache/spark/commit/3f6b5fbf01b2e78dfc9ecf7e3b45ef771fec74a7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @zsxwing Please also take a look: I guess I addressed glob overlap issue as well.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR 
    Related the glob part @zsxwing pointed out an important problem. Glob pattern is much more than checking `*` and `?`, see the link up. For simplicity take this test:
    ```
    ...
          val sourcePath = "/hello/worl{d}"
          val archivePath = "/hello/world/spark"
    ...
    ```
    This should throw `IllegalArgumentException` but proceeding without exception.
    A glob parser would be good to be used.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237342346
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -100,6 +101,36 @@ class FileStreamSource(
     
       logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
     
    +  ensureNoOverlapBetweenSourceAndArchivePath()
    +
    +  private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = {
    +    @tailrec
    +    def removeGlob(path: Path): Path = {
    +      if (path.getName.contains("*")) {
    +        removeGlob(path.getParent)
    +      } else {
    +        path
    +      }
    +    }
    +
    +    sourceOptions.sourceArchiveDir match {
    +      case None =>
    +      case Some(archiveDir) =>
    +        val sourceUri = removeGlob(qualifiedBasePath).toUri
    +        val archiveUri = new Path(archiveDir).toUri
    +
    +        val sourcePath = sourceUri.getPath
    +        val archivePath = archiveUri.getPath
    --- End diff --
    
    Nice finding. Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98918 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98918/testReport)** for PR 22952 at commit [`33c5681`](https://github.com/apache/spark/commit/33c5681ab022116133576e4e27c50e346c1ffba9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237315718
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    --- End diff --
    
    It's better to also check the return value of `rename`. A user may reuse a source archive dir and cause path conflicts. We should also log this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237319903
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being stopped
    +          logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
    +      }
    +    }
    +
    +    def remove(entry: FileEntry): Unit = {
    +      val curPath = new Path(entry.path)
    --- End diff --
    
    `val curPath = new Path(new URI(entry.path))` to make it escape/unescape path properly. `entry.path` was created from `Path.toUri.toString`. Could you also add a unit test to test special paths such as `/a/b/a b%.txt`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99862 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99862/testReport)** for PR 22952 at commit [`998e769`](https://github.com/apache/spark/commit/998e769c3b552c39736af7814f60be895dbd90d4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class FileStreamSourceCleaner(fileSystem: FileSystem, sourcePath: Path,`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    I feel the patch is simple to skip verifying manually against HDFS, but I'll try to spin up HDFS cluster and test this manually.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @gaborgsomogyi 
    Thanks for taking care, but I guess I can manage it. I'll ask for help when I can't go back to this one.
    This patch (latest change) hasn't get any feedback on committers yet so let's not rush on this and wait for it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237314690
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <code>cleanSource</code>: option to clean up completed files after processing.<br/>
    +        Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op".<br/>
    +        When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.<br/>
    +        Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"<br/>
    +        NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation.<br/>
    +        NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries.
    --- End diff --
    
    NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r232825455
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    S3 rename is O(data), whereas for real filesystems it is O(1). Azure is usually  O(1) unless some cross-shard move takes place, then it drops to O(data)...much rarer though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237340601
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <code>cleanSource</code>: option to clean up completed files after processing.<br/>
    +        Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op".<br/>
    +        When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.<br/>
    +        Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"<br/>
    +        NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation.<br/>
    +        NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries.
    --- End diff --
    
    Nice finding. I missed the case which multiple sources in same query refer same file directory. Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98919 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98919/testReport)** for PR 22952 at commit [`ca26b41`](https://github.com/apache/spark/commit/ca26b4136adc09fb9015c973953b50d894fc8779).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98919/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @gaborgsomogyi 
    Yeah I also thought about the idea (commented above) but I've lost focus on other task. Given that smaller patch is better to be reviewed easily and current patch works well (except overheads on cleaning in same thread), would we split this up and address it to another issue?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r235632761
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <code>cleanSource</code>: option to clean up completed files after processing.<br/>
    +        Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op".<br/>
    +        When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again.<br/>
    --- End diff --
    
    Yeah I guess you're right. I'll add a logic to check in initialization on FileStreamSource.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @gaborgsomogyi 
    Thanks for reviewing! I addressed your review comments except asynchronous cleanup, which might be able to break down to separated issue.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR ok, feel free to ping me if review needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR I'm fine with this, on the other hand if you're focusing on different things I'm happy to create a jira + PR for the separate thread thing to speed up processing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r231704852
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    The essential thing should be slow. Without any written notice, the users will complain again and again due to the performance regression. Frequently, the users don't say they changed this kind of setting. Instead, they say Spark suddenly shows regressions in their environment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99567/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99833/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR 
    I've taken a deeper look at the overlap thing and found the following.
    
    * Added an additional test which produced odd result:
    ```
    ...
          val sourcePath = "/hello/worl"
          val archivePath = "/hello/world/spark"
    ...
    ```
    This has thrown `IllegalArgumentException` but the `sourcePath` is different than `archivePath`. 
    This happens without any glob magic.
    
    * This approach may not work if there are symlinks involved (`fs.makeQualified` doesn't make any link resolve).
      * HDFS does not support it yet, though on the way, see https://issues.apache.org/jira/browse/HADOOP-10019
      * S3 and ADLS does not support it
    
      All in all this part is fine now.
    
    Checking the glob part...



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237342072
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being stopped
    +          logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
    +      }
    +    }
    +
    +    def remove(entry: FileEntry): Unit = {
    +      val curPath = new Path(entry.path)
    --- End diff --
    
    Yeah... actually I was somewhat confused I have to escape/unescape for path. Thanks for suggestion. Will address and add a new unit test for testing it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    retest this, please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    BTW: [HADOOP-15748](https://issues.apache.org/jira/browse/HADOOP-15748), *S3 listing inconsistency can raise NPE in globber*
    
    Could be backported to 2.8+; low risk


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #98493 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/98493/testReport)** for PR 22952 at commit [`fb01c60`](https://github.com/apache/spark/commit/fb01c60624389ee432d0a23afd14e956453cd22e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r231429484
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    Hi @dongjoon-hyun , thanks for pointing out good point! I was being concerned about only filesystem/HDFS case and not familiar with cloud environment.
    
    I guess we have possible options here:
    
    1. Rename in background thread. 
    
    For option 1, we may want to restrict the max files to enqueue, and when it reaches the max we may handle some of them synchronously. And we also may need to postpone JVM shutdown until all enqueued files are renamed.
    
    2. Provide additional option: delete (options are mutually exclusive)
    
    Actually the actions end users are expected to take are 1. moving to archive directory (with compression or not) 2. delete periodically. If moving/renaming require non-trivial cost, end users may want to just delete files directly without backing up.
    
    3. Document the overhead to description of option.
    
    While we can not clearly say how much the cost is, we can explain the fact the cleanup operation may affect processing of batch.
    
    Provided options are not mutually exclusive.
    
    cc. to @steveloughran - I think you're expert on cloud storage: could you provide your thought on this?
    also cc. to @zsxwing in case of missing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    > @zsxwing Btw, how do you think about addressing background move/deletion (I had thought and
    
    Yeah, this can be done in a separate ticket.
    
    I was playing with `org.apache.hadoop.fs.GlobFilter` to see how to detect the overlap. But one major issue is before getting the target path, we don't know whether a path will match [the glob pattern](https://self-learning-java-tutorial.blogspot.com/2016/01/hadoop-java-globbing.html) or not.
    
    The worst case, we can check the overlap when parsing the options for a normal path. For glob path, we can use `GlobFilter/GlobPattern` to check before doing the rename, in which case we can just use the target path.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r232869187
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
    --- End diff --
    
    This looks like beyond of this PR: we can address it in separate PR. Could you raise another one?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR It's a question what is not big deal, I've seen ~1 hour glob request when huge amount of files stored :)
    If file move is even worse one more reason to move it to separate thread.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99567 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99567/testReport)** for PR 22952 at commit [`79fa3e0`](https://github.com/apache/spark/commit/79fa3e0f8b3fc2a27165c3af02df96b6d4d354cb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99833 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99833/testReport)** for PR 22952 at commit [`17b9b9a`](https://github.com/apache/spark/commit/17b9b9a043ead0d448048c88b30f544228bd230b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class FileStreamSourceCleaner(fileSystem: FileSystem, sourceGlobFilters: Seq[GlobFilter])`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/98491/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237319928
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99450/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @HeartSaVioR @steveloughran 
    As I see not only `*` and `?` missing but `[]` also.
    
    * Having glob parser in spark and supporting it I think it's too heavy and brittle.
    * Considering these I would solve it with warnings + caveat message in the doc (mentioning the slow globbing on object stores).
    
    As a separate offtopic just wondering how hadoop's globbing works if expander doesn't support all the glob elements. Maybe other operators (like `[]`) handled in different code part!?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @zsxwing @gaborgsomogyi 
    What we were trying to do is enforcing archive path so that moved files will not make overlap with source path. There may be same file name with different directory so I'm also trying to persist its own path in final archived path, which means archive files will not be placed in same directory.
    
    Based on above, I thought enforcing archive path with checking glob path is not easy to do, because without knowing final archive path (per file) we can't check it matches with glob pattern. That's why I just would rather restrict all subdirectories instead of finding a way to check against glob pattern.
    
    Actually I'm a bit afraid that we might be putting too much complexity on enforcing archive path. If we are OK with not enforcing archive path and just verify the final archive path doesn't overlap source path per each source file, it would be simple to do. We can make Spark not moving file and log warning message to let end users specify other directory.
    
    Would like to hear everyone's thought and idea. Thanks in advance!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99567 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99567/testReport)** for PR 22952 at commit [`79fa3e0`](https://github.com/apache/spark/commit/79fa3e0f8b3fc2a27165c3af02df96b6d4d354cb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r231705717
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    For example, http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
    
    ```
    Since schema merging is a relatively expensive operation, 
    and is not a necessity in most cases, we turned it off
    by default starting from 1.5.0.
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99180 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99180/testReport)** for PR 22952 at commit [`007f5d5`](https://github.com/apache/spark/commit/007f5d53e7a130ae08bc36494a9f04c882e7c711).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237340952
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala ---
    @@ -74,6 +76,39 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
        */
       val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
     
    +  /**
    +   * The archive directory to move completed files. The option will be only effective when
    +   * "cleanSource" is set to "archive".
    +   *
    +   * Note that the completed file will be moved to this archive directory with respecting to
    +   * its own path.
    +   *
    +   * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive
    +   * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt".
    +   */
    +  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
    +
    +  /**
    +   * Defines how to clean up completed files. Available options are "archive", "delete", "no_op".
    +   */
    +  val cleanSource: CleanSourceMode.Value = {
    +    val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString)
    --- End diff --
    
    OK will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/22952
  
     > Looks like we can leverage GlobPattern but it is marked as @Private.
    
    I will happily switch this from private to public/evolving if you submit a patch against hadoop-trunk; backport it. Most recent changes to that class were 2015 (!) HADOOP-12436 and jan 2017, HADOOP-13976, newline handling. Nobody is going to modify that class out of fear of breaking things.
    
    Much easier for me to review and commit than to write a patch myself and try to get it reviewed...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    **[Test build #99833 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99833/testReport)** for PR 22952 at commit [`17b9b9a`](https://github.com/apache/spark/commit/17b9b9a043ead0d448048c88b30f544228bd230b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237340938
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -100,6 +101,36 @@ class FileStreamSource(
     
       logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
     
    +  ensureNoOverlapBetweenSourceAndArchivePath()
    --- End diff --
    
    Ah yes missed it. Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237319176
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being stopped
    +          logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
    +      }
    +    }
    +
    +    def remove(entry: FileEntry): Unit = {
    +      val curPath = new Path(entry.path)
    +      try {
    +        logDebug(s"Removing completed file $curPath")
    +        fs.delete(curPath, false)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being stopped
    +          logWarning(s"Fail to remove $curPath / skip removing file.", e)
    +      }
    +    }
    +
    +    val logOffset = FileStreamSourceOffset(end).logOffset
    +    metadataLog.get(logOffset) match {
    --- End diff --
    
    you can use `val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)` to use the underlying cache in FileStreamSourceLog.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by gaborgsomogyi <gi...@git.apache.org>.
Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r235312035
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala ---
    @@ -74,6 +76,43 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
        */
       val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
     
    +  /**
    +   * The archive directory to move completed files. The option will be only effective when
    +   * "cleanSource" is set to "archive".
    +   *
    +   * Note that the completed file will be moved to this archive directory with respecting to
    +   * its own path.
    +   *
    +   * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive
    +   * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt".
    +   */
    +  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
    +
    +  /**
    +   * Defines how to clean up completed files. Available options are "archive", "delete", "no_op".
    +   */
    +  val cleanSource: CleanSourceMode.Value = {
    +    val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString)
    +      .toUpperCase(Locale.ROOT)
    +
    +    val matchedModeOpt = CleanSourceMode.values.find(_.toString == modeStrOption)
    +    if (matchedModeOpt.isEmpty) {
    --- End diff --
    
    This can be simplified something like:
    ```
        matchedModeOpt match {
          case None =>
            throw new IllegalArgumentException(s"Invalid mode for clean source option $modeStrOption." +
              s" Must be one of ${CleanSourceMode.values.mkString(",")}")
          case Some(matchedMode) =>
            if (matchedMode == CleanSourceMode.ARCHIVE && sourceArchiveDir.isEmpty) {
              throw new IllegalArgumentException("Archive mode must be used with 'sourceArchiveDir' " +
                "option.")
            }
            matchedMode
        }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    @zsxwing Thanks for the detailed review! Addressed review comments.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237200636
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala ---
    @@ -74,6 +76,39 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
        */
       val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
     
    +  /**
    +   * The archive directory to move completed files. The option will be only effective when
    +   * "cleanSource" is set to "archive".
    +   *
    +   * Note that the completed file will be moved to this archive directory with respecting to
    +   * its own path.
    +   *
    +   * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive
    +   * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt".
    +   */
    +  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
    +
    +  /**
    +   * Defines how to clean up completed files. Available options are "archive", "delete", "no_op".
    +   */
    +  val cleanSource: CleanSourceMode.Value = {
    +    val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString)
    --- End diff --
    
    nit: could you create a method to `CleanSourceMode` to convert a string to `CleanSourceMode.Value`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22952: [SPARK-20568][SS] Rename files which are complete...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r231695749
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -530,6 +530,8 @@ Here are the details of all the sources in Spark.
             "s3://a/dataset.txt"<br/>
             "s3n://a/b/dataset.txt"<br/>
             "s3a://a/b/c/dataset.txt"<br/>
    +        <br/>
    +        <code>renameCompletedFiles</code>: whether to rename completed files in previous batch (default: false). If the option is enabled, input file will be renamed with additional postfix "_COMPLETED_". This is useful to clean up old input files to save space in storage.
    --- End diff --
    
    @dongjoon-hyun 
    For Storm, it renames input file twice, 1. in process 2. completed (actually it is not a rename, but move to archive directory). HDFS spout is created at 2015 which I don't expect there's deep consideration on cloud storage.
    For Flink I have no idea, I'll explore how they handle it.
    
    I think the feature is just an essential thing in ETL situation: a comment in JIRA clearly shows why the feature is needed.
    https://issues.apache.org/jira/browse/SPARK-20568?focusedCommentId=16356929&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16356929


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99862/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/22952
  
    Hadoop FS glob filtering is pathologically bad  on object stores. 
    I have tried in the past to do an ~O(1) impl for S3 [HADOOP-13371](https://issues.apache.org/jira/browse/HADOOP-13371). While I could produce one which was efficient for test cases, it would suffer in the use case "selective pattern match at the top of a very wide tree", where you really do want to filter down aggressively for the topmost directory/directories.
    
    I think there you'd want to have a threshold as to how many path elements up you'd switch from ls dir + match into the full deep listfiles(recursive) scan 
    
    Not looked at it for ages. If someone does want to play there, welcome to take it up


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org