You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/02/18 13:46:35 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

HeartSaVioR opened a new pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620
 
 
   ### What changes were proposed in this pull request?
   
   This patch caches the fetched list of files in FileStreamSource to avoid re-fetching whenever possible.
   
   This improvement would be effective when the source options are being set to below:
    
   * `maxFilesPerTrigger` is set
   * `latestFirst` is set to `false` (default)
   
   as 
   
   * if `maxFilesPerTrigger` is unset, Spark will process all the new files within a batch
   * if `latestFirst` is set to `true`, it intends to process "latest" files which Spark has to refresh for every batch
   
   Fetched list of files are filtered against SeenFilesMap before caching - unnecessary files are filtered in this phase. Once we cached the file, we don't check the file again for `isNewFile`, as Spark processes the files in timestamp order so cached files should have equal or later timestamp than latestTimestamp in SeenFilesMap.
   
   Cache is only persisted in memory to simplify the logic - if we support restore cache when restarting query, we should deal with the changes of source options.
   
   ### Why are the changes needed?
   
   Spark spends huge cost to fetch the list of files from input paths, but restricts the usage of list in a batch. If the streaming query starts from huge input data for various reasons (initial load, reprocessing, etc.) the cost to fetch the files will be applied to all batches as it is unusual to let first microbatch to process all of initial load.
   
   SPARK-20568 will help to reduce the cost to fetch as processed files will be either deleted or moved outside of input paths, but it still won't help in early phase.
   
   ### Does this PR introduce any user-facing change?
   
   Yes, the driver process would require more memory than before if maxFilesPerTrigger is set and latestFirst is set to "false" to cache fetched files. Previously Spark only takes some amount from left side of the list and discards remaining - so technically the peak memory would be same, but they can be freed sooner.
   
   It may not hurt much, as peak memory is still be similar, and it would require similar amount of memory when maxFilesPerTrigger is unset.
   
   ### How was this patch tested?
   
   New unit tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614458634
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26037/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r409583837
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        // 4 batches will be available for 20 input files
+        (0 to 3).foreach { batchId =>
+          val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+            .asInstanceOf[FileStreamSourceOffset]
+          verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1)
+        }
+
+        val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        // latestOffset returns the offset for previous batch which means no new batch is presented
+        assert(3 === offsetBatch.logOffset)
+        // listing should be performed after the list of unread files are exhausted
+        assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+      }
+    }
+  }
+
+  test("Don't cache unread files when latestFirst is true") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
 
 Review comment:
   What I've meant here is that the test should fail if some nasty code puts irrelevant data into the map. For example when I put (just for the sake of representation) the following:
   ```
           CountListingLocalFileSystem.resetCount()
           CountListingLocalFileSystem.pathToNumListStatusCalled.put("foo", new AtomicLong(1))
   ```
   it would be good to fail.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614461556
 
 
   **[Test build #121353 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121353/testReport)** for PR 27620 at commit [`07eed68`](https://github.com/apache/spark/commit/07eed68a03895ac677a740360e2eb0996ab697f6).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613136574
 
 
   **[Test build #121231 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121231/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614458620
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614607414
 
 
   **[Test build #121353 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121353/testReport)** for PR 27620 at commit [`07eed68`](https://github.com/apache/spark/commit/07eed68a03895ac677a740360e2eb0996ab697f6).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613136907
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614392168
 
 
   > Just wondering what would happen in the following scenario?
   > 
   > "latestFirst" -> "true"
   > "maxFilesPerTrigger" -> "5"
   > 6 files are available and 5 processed in batch0 -> 1 stored in unreadFiles
   > 1 new file arrives
   > batch1 processed in next round
   > The question is with what content will be batch1 executed?
   
   I've explained the condition when the functionality takes effect in the description of PR - it won't cache the list of files if latestFirst is true, so it should be same as it is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587470452
 
 
   **[Test build #118640 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/118640/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615436453
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121415/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587470452
 
 
   **[Test build #118640 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/118640/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613855732
 
 
   **[Test build #121303 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121303/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613853337
 
 
   retest this, please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r409988154
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        // 4 batches will be available for 20 input files
+        (0 to 3).foreach { batchId =>
+          val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+            .asInstanceOf[FileStreamSourceOffset]
+          verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1)
+        }
+
+        val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        // latestOffset returns the offset for previous batch which means no new batch is presented
+        assert(3 === offsetBatch.logOffset)
+        // listing should be performed after the list of unread files are exhausted
+        assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+      }
+    }
+  }
+
+  test("Don't cache unread files when latestFirst is true") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
 
 Review comment:
   Your example is now failing because I added check for counting the element of pathToNumListStatusCalled. Does it address your comment?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613136574
 
 
   **[Test build #121231 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121231/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615081061
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615435529
 
 
   **[Test build #121415 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121415/testReport)** for PR 27620 at commit [`8251b74`](https://github.com/apache/spark/commit/8251b744d40f4f8744df53d68842894489808c2b).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615297996
 
 
   **[Test build #121415 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121415/testReport)** for PR 27620 at commit [`8251b74`](https://github.com/apache/spark/commit/8251b744d40f4f8744df53d68842894489808c2b).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587471848
 
 
   The patch is actually very straightforward about how it works and how it helps (as the changeset except the test code is very small).
   
   I'll attach the test result for the use case of "initial load" in the section of "How was this patch tested?" sooner. I've already have screenshots of UI, but would like to run against latest master.
   
   EDIT: Just updated the description of PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614608335
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121353/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613136911
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25919/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r408784236
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1980,3 +2089,23 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
 object ExistsThrowsExceptionFileSystem {
   val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 }
+
+class CountListingLocalFileSystem extends RawLocalFileSystem {
+  import CountListingLocalFileSystem._
+
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def listStatus(f: Path): Array[FileStatus] = {
+    val path = f.toUri.getPath
+    val curVal = pathToNumListStatusCalled.getOrElseUpdate(path, new AtomicLong(0))
+    curVal.incrementAndGet()
+    super.listStatus(f)
+  }
+}
+
+object CountListingLocalFileSystem {
+  val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 
 Review comment:
   Maybe we can use the object name since there are multiple filesystems declared here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614458620
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613855732
 
 
   **[Test build #121303 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121303/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r408818460
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        // 4 batches will be available for 20 input files
+        (0 to 3).foreach { batchId =>
+          val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+            .asInstanceOf[FileStreamSourceOffset]
+          verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1)
+        }
+
+        val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        // latestOffset returns the offset for previous batch which means no new batch is presented
+        assert(3 === offsetBatch.logOffset)
+        // listing should be performed after the list of unread files are exhausted
+        assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+      }
+    }
+  }
+
+  test("Don't cache unread files when latestFirst is true") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
 
 Review comment:
   Maybe it worth to check nothing not relevant is inside. This probably indicate the need of some reset functionality for `pathToNumListStatusCalled`...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613856201
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25987/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615295573
 
 
   Just addressed lower bar of unseen files - the threshold of ratio is set to 0.2 for now, and we can adjust it later if we can find better value (or even condition).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615081061
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587470987
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613136911
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25919/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614400225
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615038431
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614400228
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26032/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615436453
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121415/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r408826170
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
 
 Review comment:
   Maybe `idx * 10000`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614455741
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121348/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615038431
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614455451
 
 
   **[Test build #121348 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121348/testReport)** for PR 27620 at commit [`07eed68`](https://github.com/apache/spark/commit/07eed68a03895ac677a740360e2eb0996ab697f6).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613211717
 
 
   **[Test build #121231 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121231/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).
    * This patch **fails PySpark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615037988
 
 
   **[Test build #121395 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121395/testReport)** for PR 27620 at commit [`57981cd`](https://github.com/apache/spark/commit/57981cd45eed8cc16389468dc790fd27bde18f7d).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615295573
 
 
   Just addressed lower bar of unseen files - the threshold ratio is set to 0.2 (20%) of max files for now, and we can adjust it later if we can find better value (or even condition).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613856192
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615298751
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615297996
 
 
   **[Test build #121415 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121415/testReport)** for PR 27620 at commit [`8251b74`](https://github.com/apache/spark/commit/8251b744d40f4f8744df53d68842894489808c2b).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r408782750
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1980,3 +2089,23 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
 object ExistsThrowsExceptionFileSystem {
   val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 }
+
+class CountListingLocalFileSystem extends RawLocalFileSystem {
+  import CountListingLocalFileSystem._
+
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def listStatus(f: Path): Array[FileStatus] = {
+    val path = f.toUri.getPath
 
 Review comment:
   Nit: `f.toUri.getPath` can be inlined.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614458634
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26037/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614608324
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615182329
 
 
   Hmm, seems the issue is relevant.
   
   `maybe it's good to add a lower bar to avoid the weird case, listing files provides slightly more than maxFilesPerTrigger.`
   
   +1 on this. Maybe we can add a new test to cover this case.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614458205
 
 
   retest this, please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r410273388
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        // 4 batches will be available for 20 input files
+        (0 to 3).foreach { batchId =>
+          val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+            .asInstanceOf[FileStreamSourceOffset]
+          verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1)
+        }
+
+        val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        // latestOffset returns the offset for previous batch which means no new batch is presented
+        assert(3 === offsetBatch.logOffset)
+        // listing should be performed after the list of unread files are exhausted
+        assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+      }
+    }
+  }
+
+  test("Don't cache unread files when latestFirst is true") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
 
 Review comment:
   Sorry I have to revert it. My bad. I remembered why I only checked the directory - this requires all input files to be verified, which is actually redundant, as we already verified such behavior from the UT "Caches and leverages unread files".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613212059
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121231/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587470996
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/23393/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614455728
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615037988
 
 
   **[Test build #121395 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121395/testReport)** for PR 27620 at commit [`57981cd`](https://github.com/apache/spark/commit/57981cd45eed8cc16389468dc790fd27bde18f7d).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615038435
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26078/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615436445
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614656637
 
 
   Only one file left in unread will be used for the batch for that case.
   
   It's designed to avoid calling list operation whenever possible, but in some case it might be valid to drop unread files and call list operation if the number of remaining files are relatively smaller than the max files to trigger. I think it's affecting only few batch, though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587470987
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614663610
 
 
   I've double checked `maxFilesPerTrigger` semantics and it's only max number to consider so this doesn't break that. Since I agree that it affects small amount of batches I agree that the overall gain is positive.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614608324
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615436445
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587599159
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r408826490
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        // 4 batches will be available for 20 input files
+        (0 to 3).foreach { batchId =>
+          val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+            .asInstanceOf[FileStreamSourceOffset]
+          verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1)
+        }
+
+        val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        // latestOffset returns the offset for previous batch which means no new batch is presented
+        assert(3 === offsetBatch.logOffset)
+        // listing should be performed after the list of unread files are exhausted
+        assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+      }
+    }
+  }
+
+  test("Don't cache unread files when latestFirst is true") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
 
 Review comment:
   Same here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615182329
 
 
   Hmm, seems the issue is relevant.
   
   `maybe it's good to add a lower bar to avoid the weird case, listing files provides slightly more than maxFilesPerTrigger.`
   
   +1 on this.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614400228
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26032/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587599159
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587471848
 
 
   The patch is actually very straightforward about how it works and how it helps (as the changeset except the test code is very small).
   
   I'll attach the test result for the use case of "initial load" in the section of "How was this patch tested?" sooner. I've already have screenshots of UI, but would like to run against latest master.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587471848
 
 
   The patch is actually very straightforward about how it works and how it helps (as the changeset except the test code is very small).
   
   I'll attach the test result for the use case of "initial load" in the section of "How was this patch tested?" sooner. I've already have screenshots of UI, but would like to run again against latest master.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613856192
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614006872
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587599168
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/118640/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615081069
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121395/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614005748
 
 
   **[Test build #121303 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121303/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615081069
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121395/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r409989129
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        // 4 batches will be available for 20 input files
+        (0 to 3).foreach { batchId =>
+          val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+            .asInstanceOf[FileStreamSourceOffset]
+          verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1)
+        }
+
+        val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        // latestOffset returns the offset for previous batch which means no new batch is presented
+        assert(3 === offsetBatch.logOffset)
+        // listing should be performed after the list of unread files are exhausted
+        assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+      }
+    }
+  }
+
+  test("Don't cache unread files when latestFirst is true") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
+          lastModified += 10000
+          f
+        }
+
+        source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
 
 Review comment:
   Sigh I realized I didn't push the change. Sorry about it. Will push.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587598410
 
 
   **[Test build #118640 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/118640/testReport)** for PR 27620 at commit [`b417911`](https://github.com/apache/spark/commit/b417911356356d35abbad768bf583b55a36d25cf).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614461556
 
 
   **[Test build #121353 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121353/testReport)** for PR 27620 at commit [`07eed68`](https://github.com/apache/spark/commit/07eed68a03895ac677a740360e2eb0996ab697f6).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615043179
 
 
   Hmm... I thought about that more, and maybe it's good to add a lower bar to avoid the weird case, listing files provides slightly more than maxFilesPerTrigger. The tricky part is deciding the condition to discard unread files (ratio based on maxFilesPerTrigger? static number?); logic to add would be straightforward.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614620976
 
 
   > > Just wondering what would happen in the following scenario?
   > > "latestFirst" -> "true"
   > > "maxFilesPerTrigger" -> "5"
   > > 6 files are available and 5 processed in batch0 -> 1 stored in unreadFiles
   > > 1 new file arrives
   > > batch1 processed in next round
   > > The question is with what content will be batch1 executed?
   > 
   > I've explained the condition when the functionality takes effect in the description of PR - it won't cache the list of files if latestFirst is true, so it should be same as it is.
   
   Wanted to write `"latestFirst" -> "false"` but with the modified config my question still stands.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614399874
 
 
   **[Test build #121348 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121348/testReport)** for PR 27620 at commit [`07eed68`](https://github.com/apache/spark/commit/07eed68a03895ac677a740360e2eb0996ab697f6).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613136907
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615038435
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26078/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587599168
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/118640/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614455741
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121348/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613856201
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25987/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613212052
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615298764
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26100/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615298751
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613135266
 
 
   retest this, please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614455728
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r409257481
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(expectedDir.exists())
     assert(expectedDir.list().exists(_.startsWith(filePrefix)))
   }
+
+  private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+
+  test("Caches and leverages unread files") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        def verifyBatch(
+            offset: FileStreamSourceOffset,
+            expectedBatchId: Long,
+            inputFiles: Seq[File],
+            expectedListingCount: Int): Unit = {
+          val batchId = offset.logOffset
+          assert(batchId === expectedBatchId)
+
+          val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
+          assert(files.forall(_.batchId == batchId))
+
+          val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
+          val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5)
+            .map(_.getCanonicalPath)
+          assert(actualInputFiles === expectedInputFiles)
+
+          assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
+            .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        }
+
+        // provide 20 files in src, with sequential "last modified" to guarantee ordering
+        var lastModified = 0
+        val inputFiles = (0 to 19).map { idx =>
+          val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(lastModified)
 
 Review comment:
   Nice finding. I guess I used the variable and forgot to clean up when the variable was no longer needed. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614006879
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121303/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613212052
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614399874
 
 
   **[Test build #121348 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121348/testReport)** for PR 27620 at commit [`07eed68`](https://github.com/apache/spark/commit/07eed68a03895ac677a740360e2eb0996ab697f6).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r382452458
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1980,3 +2089,23 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
 object ExistsThrowsExceptionFileSystem {
   val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 }
+
+class CountListingLocalFileSystem extends RawLocalFileSystem {
 
 Review comment:
   The code regarding FileSystem I add here is very similar with what I add in #27664. When either one gets merged, I'll rebase and deduplicate it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587504565
 
 
   cc. @tdas @zsxwing @gaborgsomogyi 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-587470996
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/23393/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615298764
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/26100/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614006879
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121303/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614400225
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614608335
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121353/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-614006872
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#discussion_r409256856
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ##########
 @@ -1980,3 +2089,23 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
 object ExistsThrowsExceptionFileSystem {
   val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 }
+
+class CountListingLocalFileSystem extends RawLocalFileSystem {
+  import CountListingLocalFileSystem._
+
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def listStatus(f: Path): Array[FileStatus] = {
+    val path = f.toUri.getPath
+    val curVal = pathToNumListStatusCalled.getOrElseUpdate(path, new AtomicLong(0))
+    curVal.incrementAndGet()
+    super.listStatus(f)
+  }
+}
+
+object CountListingLocalFileSystem {
+  val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 
 Review comment:
   Ah yes good point. Will do.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-615080873
 
 
   **[Test build #121395 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121395/testReport)** for PR 27620 at commit [`57981cd`](https://github.com/apache/spark/commit/57981cd45eed8cc16389468dc790fd27bde18f7d).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #27620: [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
URL: https://github.com/apache/spark/pull/27620#issuecomment-613212059
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/121231/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org