You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2020/01/17 18:46:07 UTC

[spark] branch master updated: [SPARK-29876][SS] Delete/archive file source completed files in separate thread

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new abf759a  [SPARK-29876][SS] Delete/archive file source completed files in separate thread
abf759a is described below

commit abf759a91e01497586b8bb6b7a314dd28fd6cff1
Author: Gabor Somogyi <ga...@gmail.com>
AuthorDate: Fri Jan 17 10:45:36 2020 -0800

    [SPARK-29876][SS] Delete/archive file source completed files in separate thread
    
    ### What changes were proposed in this pull request?
    [SPARK-20568](https://issues.apache.org/jira/browse/SPARK-20568) added the possibility to clean up completed files in streaming query. Deleting/archiving uses the main thread which can slow down processing. In this PR I've created thread pool to handle file delete/archival. The number of threads can be configured with `spark.sql.streaming.fileSource.cleaner.numThreads`.
    
    ### Why are the changes needed?
    Do file delete/archival in separate thread.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing unit tests.
    
    Closes #26502 from gaborgsomogyi/SPARK-29876.
    
    Authored-by: Gabor Somogyi <ga...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 docs/structured-streaming-programming-guide.md     |  5 +--
 .../org/apache/spark/sql/internal/SQLConf.scala    |  6 ++++
 .../sql/execution/streaming/FileStreamSource.scala | 40 +++++++++++++++++++---
 .../sql/streaming/FileStreamSourceSuite.scala      |  9 +++--
 4 files changed, 50 insertions(+), 10 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 306d688..429d456 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -551,9 +551,10 @@ Here are the details of all the sources in Spark.
         When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
         For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.<br/>
         Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
-        NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
+        NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
+        Number of threads used in completed file cleaner can be configured with<code>spark.sql.streaming.fileSource.cleaner.numThreads</code> (default: 1).<br/>
         NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
-        NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
+        NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up.
         <br/><br/>
         For file-format-specific options, see the related methods in <code>DataStreamReader</code>
         (<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 98a6551..279c79f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1301,6 +1301,12 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val FILE_SOURCE_CLEANER_NUM_THREADS =
+    buildConf("spark.sql.streaming.fileSource.cleaner.numThreads")
+      .doc("Number of threads used in the file source completed file cleaner.")
+      .intConf
+      .createWithDefault(1)
+
   val STREAMING_SCHEMA_INFERENCE =
     buildConf("spark.sql.streaming.schemaInference")
       .internal()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 39fb7f8..36f7002 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.net.URI
+import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.TimeUnit._
 
 import scala.util.control.NonFatal
@@ -30,7 +31,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
 
 /**
  * A very simple source that reads files from the given directory as they appear.
@@ -285,7 +288,7 @@ class FileStreamSource(
     }
   }
 
-  override def stop(): Unit = {}
+  override def stop(): Unit = sourceCleaner.foreach(_.stop())
 }
 
 
@@ -353,8 +356,35 @@ object FileStreamSource {
     def size: Int = map.size()
   }
 
-  private[sql] trait FileStreamSourceCleaner {
-    def clean(entry: FileEntry): Unit
+  private[sql] abstract class FileStreamSourceCleaner extends Logging {
+    private val cleanThreadPool: Option[ThreadPoolExecutor] = {
+      val numThreads = SQLConf.get.getConf(SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS)
+      if (numThreads > 0) {
+        logDebug(s"Cleaning file source on $numThreads separate thread(s)")
+        Some(ThreadUtils.newDaemonCachedThreadPool("file-source-cleaner-threadpool", numThreads))
+      } else {
+        logDebug("Cleaning file source on main thread")
+        None
+      }
+    }
+
+    def stop(): Unit = cleanThreadPool.foreach(ThreadUtils.shutdown(_))
+
+    def clean(entry: FileEntry): Unit = {
+      cleanThreadPool match {
+        case Some(p) =>
+          p.submit(new Runnable {
+            override def run(): Unit = {
+              cleanTask(entry)
+            }
+          })
+
+        case None =>
+          cleanTask(entry)
+      }
+    }
+
+    protected def cleanTask(entry: FileEntry): Unit
   }
 
   private[sql] object FileStreamSourceCleaner {
@@ -448,7 +478,7 @@ object FileStreamSource {
       filters.toList
     }
 
-    override def clean(entry: FileEntry): Unit = {
+    override protected def cleanTask(entry: FileEntry): Unit = {
       val curPath = new Path(new URI(entry.path))
       val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + curPath.toUri.getPath)
 
@@ -472,7 +502,7 @@ object FileStreamSource {
   private[sql] class SourceFileRemover(fileSystem: FileSystem)
     extends FileStreamSourceCleaner with Logging {
 
-    override def clean(entry: FileEntry): Unit = {
+    override protected def cleanTask(entry: FileEntry): Unit = {
       val curPath = new Path(new URI(entry.path))
       try {
         logDebug(s"Removing completed file $curPath")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 2eb875c..632e007 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1636,7 +1636,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       withSQLConf(
         SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
         // Force deleting the old logs
-        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
+        SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
       ) {
         val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
           "cleanSource" -> "delete")
@@ -1680,7 +1681,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       withSQLConf(
         SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
         // Force deleting the old logs
-        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+        SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
+        SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
       ) {
         val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
           "cleanSource" -> "archive", "sourceArchiveDir" -> archiveDir.getAbsolutePath)
@@ -1749,7 +1751,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         withSQLConf(
           SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
           // Force deleting the old logs
-          SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+          SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
+          SQLConf.FILE_SOURCE_CLEANER_NUM_THREADS.key -> "0"
         ) {
           val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
             "cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org