You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/12/17 06:45:50 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query

HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r358624511
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ##########
 @@ -330,4 +341,96 @@ object FileStreamSource {
 
     def size: Int = map.size()
   }
+
+  private[sql] trait FileStreamSourceCleaner {
+    def clean(entry: FileEntry): Unit
+  }
+
+  private[sql] object FileStreamSourceCleaner {
+    def apply(
+        fileSystem: FileSystem,
+        sourcePath: Path,
+        option: FileStreamOptions,
+        hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match {
+      case CleanSourceMode.ARCHIVE =>
+        require(option.sourceArchiveDir.isDefined)
+        val path = new Path(option.sourceArchiveDir.get)
+        val archiveFs = path.getFileSystem(hadoopConf)
+        val qualifiedArchivePath = archiveFs.makeQualified(path)
+        Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath))
+
+      case CleanSourceMode.DELETE =>
+        Some(new SourceFileRemover(fileSystem))
+
+      case _ => None
+    }
+  }
+
+  private[sql] class SourceFileArchiver(
+      fileSystem: FileSystem,
+      sourcePath: Path,
+      baseArchiveFileSystem: FileSystem,
+      baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+    assertParameters()
+
+    private def assertParameters(): Unit = {
+      require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " +
+        s"on a different file system than the source files. source path: $sourcePath" +
+        s" / base archive path: $baseArchivePath")
+
+      /**
+       * FileStreamSource reads the files which one of below conditions is met:
+       * 1) file itself is matched with source path
+       * 2) parent directory is matched with source path
 
 Review comment:
   FYI, just filed https://issues.apache.org/jira/browse/SPARK-30281 and raised a patch with picking the option 2. #26845

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org