You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "MaxNevermind (via GitHub)" <gi...@apache.org> on 2024/01/09 06:34:12 UTC

[PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

MaxNevermind opened a new pull request, #44636:
URL: https://github.com/apache/spark/pull/44636

   What changes were proposed in this pull request?
   This PR adds [Input Streaming Source's](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) option maxBytesPerTrigger for limiting the total size of input files in a streaming batch. Semantics of maxBytesPerTrigger is very close to already existing one maxFilesPerTrigger option.
   
   How a feature was implemented?
   Because maxBytesPerTrigger is semantically close to maxFilesPerTrigger I used all the maxFilesPerTrigger usages in the whole repository as a potential places that requires changes, that includes:
   
   Option paramater definition
   Option related logic
   Option related ScalaDoc and MD files
   Option related test
   I went over the usage of all usages of maxFilesPerTrigger in FileStreamSourceSuite and implemented maxBytesPerTrigger in the same fashion as those two are pretty close in their nature. From the structure and elements of ReadLimit I've concluded that current design implies only one simple rule for ReadLimit, so I openly prohibited the setting of both maxFilesPerTrigger and maxBytesPerTrigger at the same time.
   
   Why are the changes needed?
   This feature is useful for our and our sister teams and we expect it will find a broad acceptance among Spark users. We have a use-case in a few of the Spark pipelines we support when we use Available-now trigger for periodic processing using Spark Streaming. We use maxFilesPerTrigger threshold for now, but this is not ideal as Input file size might change with the time which requires periodic configuration adjustment of maxFilesPerTrigger. Computational complexity of the job depends on the event count/total size of the input and maxBytesPerTrigger is a better predictor of that than maxFilesPerTrigger.
   
   Does this PR introduce any user-facing change?
   Yes
   
   How was this patch tested?
   New unit tests were added or existing maxFilesPerTrigger test were extended. I searched maxFilesPerTrigger related test and added new tests or extended existing ones trying to minimize and simplify the changes.
   
   Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1935366843

   @dongjoon-hyun 
   I think I don't have a write access to merge PR, see a screenshot bellow. Is there anything I need to do to attain it or maybe you can do it for me? Btw I guess commits need to be squashed.
   
   ![Screenshot 2024-02-08 at 9 37 47 PM](https://github.com/apache/spark/assets/4653936/318aca27-c337-4ccf-bcfa-1d031c4afe3e)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun closed pull request #44636: [SPARK-46641][SS] Add maxBytesPerTrigger threshold
URL: https://github.com/apache/spark/pull/44636


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448540749


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1154,25 +1154,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
   }
 
   test("max files per trigger") {
+    testThresholdLogic("maxFilesPerTrigger")
+  }
+
+  test("SPARK-46641: max bytes per trigger") {
+    testThresholdLogic("maxBytesPerTrigger")
+  }
+
+  private def testThresholdLogic(option: String): Unit = {
     withTempDir { case src =>
       var lastFileModTime: Option[Long] = None
 
       /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
+      def createFile(data: String): File = {

Review Comment:
   May I ask why we need this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1449773817


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1300,6 +1350,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
           .option("checkpointLocation", checkpoint)
           .start(targetDir)
       }
+

Review Comment:
   fixed



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1323,6 +1374,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         q2.stop()
       }
     }
+

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448101682


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {

Review Comment:
   Please don't touch the existing old test case. Since we don't allow both threshold. The should be independently tested. The code duplication can be refactored into a test helper method instead of making a big single test case.
   
   Also, please use the test prefix like the following. 
   ```scala
   - test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
   + test("SPARK-46641: maxBytesPerTrigger threshold logic must be obeyed") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448536927


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +186,23 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (bFiles, usFiles) = takeFilesUntilMax(newFiles, files.maxBytes())
+        if (usFiles.map(_.size).sum < files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO) {

Review Comment:
   `takeFilesUntilMax` had better returns the total size together. Then,
   - we can avoid this re-computation, `usFiles.map(_.size).sum`. 
   - we can avoid the chance of overflow here if `takeFilesUntilMax` guarantees that once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448541966


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1154,25 +1154,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
   }
 
   test("max files per trigger") {
+    testThresholdLogic("maxFilesPerTrigger")
+  }
+
+  test("SPARK-46641: max bytes per trigger") {
+    testThresholdLogic("maxBytesPerTrigger")
+  }
+
+  private def testThresholdLogic(option: String): Unit = {
     withTempDir { case src =>
       var lastFileModTime: Option[Long] = None
 
       /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
+      def createFile(data: String): File = {
+        val file = stringToFile(new File(src, s"$data.txt"), data)
         if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
         lastFileModTime = Some(file.lastModified)
         file
       }
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+      createFile("a")
+      createFile("b")
+      createFile("c")

Review Comment:
   If there is no reason, please recover the old way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1925531373

   We are already on this PR. Do you want to bring someone-else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1882510001

   > @MaxNevermind Could you create a Jira ticket and replace `SPARK-XXXX` with Jira ticket number in the PR title? Thanks.
   
   Will do, but I didn't have a ASF JIra account, I've just requested it using a ASF self service, it says that it will take few days to review my request.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448102701


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
+    Seq(
+      "maxFilesPerTrigger",
+      "maxBytesPerTrigger"
+    ).foreach{ thresholdOption =>
+      withTempDir { case src =>
+        var lastFileModTime: Option[Long] = None
+
+        /** Create a text file with a single data item */
+        def createFile(data: String): File = {
+          val file = stringToFile(new File(src, s"$data.txt"), data)
+          if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+          lastFileModTime = Some(file.lastModified)
+          file
+        }
 
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
-      }
+        createFile("a")
+        createFile("b")
+        createFile("c")
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+        // Set up a query to read text files 2 at a time
+        val df = spark
+          .readStream
+          .option(thresholdOption, 2)
+          .text(src.getCanonicalPath)
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName("file_data")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+        q.processAllAvailable()
+        val memorySink = q.sink.asInstanceOf[MemorySink]
+        val fileSource = getSourcesFromStreamingQuery(q).head
+
+        /** Check the data read in the last batch */
+        def checkLastBatchData(data: Char*): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      // Set up a query to read text files 2 at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 2)
-        .text(src.getCanonicalPath)
-      val q = df
-        .writeStream
-        .format("memory")
-        .queryName("file_data")
-        .start()
-        .asInstanceOf[StreamingQueryWrapper]
-        .streamingQuery
-      q.processAllAvailable()
-      val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = getSourcesFromStreamingQuery(q).head
-
-      /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        def checkAllData(data: Seq[Char]): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.allData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      def checkAllData(data: Seq[Int]): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.allData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        /** Check how many batches have executed since the last time this check was made */
+        var lastBatchId = -1L
+        def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
+          require(lastBatchId >= 0)
+          assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+          lastBatchId = memorySink.latestBatchId.get
+        }
 
-      /** Check how many batches have executed since the last time this check was made */
-      var lastBatchId = -1L
-      def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
-        require(lastBatchId >= 0)
-        assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+        checkLastBatchData('c')  // (a and b) should be in batch 1, (c) should be in batch 2 (last)
+        checkAllData('a' to 'c')
         lastBatchId = memorySink.latestBatchId.get
-      }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
-      checkAllData(1 to 3)
-      lastBatchId = memorySink.latestBatchId.get
+        fileSource.withBatchingLocked {
+          createFile("d")
+          createFile("e")   // d and e should be in a batch
+          createFile("f")
+          createFile("g")   // f and g should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(2)
+        checkLastBatchData('f', 'g')
+        checkAllData('a' to 'g')
+
+        fileSource.withBatchingLocked {
+          createFile("h")
+          createFile("i")    // h and i should be in a batch
+          createFile("j")
+          createFile("k")   // j and k should be in a batch
+          createFile("l")   // l should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(3)
+        checkLastBatchData('l')
+        checkAllData('a' to 'l')
 
-      fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
-      }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
-
-      fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        q.stop()
       }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
-
-      q.stop()
     }
   }
 
-  testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
-    withTable(testTable) {
-      withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-          val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
-            val q = df.writeStream.format("memory").queryName(testTable).start()
-            try {
-              q.processAllAvailable()
-            } finally {
-              q.stop()
+  testQuietly("max bytes per trigger & max files per trigger - incorrect values") {
+    Seq(
+      ("maxBytesPerTrigger_test", "maxBytesPerTrigger"),
+      ("maxFilesPerTrigger_test", "maxFilesPerTrigger")
+    ).foreach { case (testTable, optionName) =>
+      withTable(testTable) {
+        withTempDir { case src =>
+          def testMaxFilePerTriggerValue(value: String): Unit = {
+            val df = spark.readStream.option(optionName, value).text(src.getCanonicalPath)
+            val e = intercept[StreamingQueryException] {
+              // Note: a tested option is checked in the stream thread when creating the source
+              val q = df.writeStream.format("memory").queryName(testTable).start()
+              try {
+                q.processAllAvailable()
+              } finally {
+                q.stop()
+              }
+            }
+            assert(e.getCause.isInstanceOf[IllegalArgumentException])
+            Seq(optionName, value, "positive integer").foreach { s =>
+              assert(e.getMessage.contains(s))
             }
           }
-          assert(e.getCause.isInstanceOf[IllegalArgumentException])
-          Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
-            assert(e.getMessage.contains(s))
-          }
-        }
 
-        testMaxFilePerTriggerValue("not-a-integer")
-        testMaxFilePerTriggerValue("-1")
-        testMaxFilePerTriggerValue("0")
-        testMaxFilePerTriggerValue("10.1")
+          testMaxFilePerTriggerValue("not-a-integer")
+          testMaxFilePerTriggerValue("-1")
+          testMaxFilePerTriggerValue("0")
+          testMaxFilePerTriggerValue("10.1")
+        }
       }
     }
   }
 
-  test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") {
-    withTempDirs { (src, target) =>
-      val checkpoint = new File(target, "chk").getCanonicalPath
-      val targetDir = new File(target, "data").getCanonicalPath
-      var lastFileModTime: Option[Long] = None
-
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
+  testQuietly("max bytes per trigger & max files per trigger - both set") {
+    val testTable = "maxBytesPerTrigger_maxFilesPerTrigger_test"
+    withTable(testTable) {
+      withTempDir { case src =>
+        val df = spark.readStream
+          .option("maxBytesPerTrigger", "1")
+          .option("maxFilesPerTrigger", "1")
+          .text(src.getCanonicalPath)
+        val e = intercept[StreamingQueryException] {
+          // Note: a tested option is checked in the stream thread when creating the source
+          val q = df.writeStream.format("memory").queryName(testTable).start()
+          try {
+            q.processAllAvailable()
+          } finally {
+            q.stop()
+          }
+        }
+        assert(e.getCause.isInstanceOf[IllegalArgumentException])
+        Seq("maxBytesPerTrigger", "maxFilesPerTrigger", "can't be both set").foreach { s =>
+          assert(e.getMessage.contains(s))
+        }
       }
+    }
+  }
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
-
-      // Set up a query to read text files one at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 1)
-        .text(src.getCanonicalPath)
+  test("SPARK-30669: maxFilesPerTrigger & maxBytesPerTrigger - ignored when using Trigger.Once") {

Review Comment:
   ditto. Please avoid touching the existing SPARK-30669 test case .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1449773508


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448276815


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {

Review Comment:
   Should I use prefix `SPARK-46641` for all `maxBytesPerTrigger` related tests or only this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1454639336


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   @dongjoon-hyun 
   I misunderstood and was under impression that throwing exception is expected behavior.
   I adjusted the the code, no exceptions now, file split stops before `Long` overflow, please check it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1449772885


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1154,25 +1154,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
   }
 
   test("max files per trigger") {
+    testThresholdLogic("maxFilesPerTrigger")
+  }
+
+  test("SPARK-46641: max bytes per trigger") {
+    testThresholdLogic("maxBytesPerTrigger")
+  }
+
+  private def testThresholdLogic(option: String): Unit = {
     withTempDir { case src =>
       var lastFileModTime: Option[Long] = None
 
       /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
+      def createFile(data: String): File = {

Review Comment:
   The reason is that I was trying to reuse original code as much as possible and in original version of the code we write not just 1 char strings but also 2 char strings like 10, 11, 12. That will break the logic for maxBytesPerTrigger = 1. So the idea behind the replacement was to make all files of 1 byte size by switching to writing chars('a' to 'l') instead of numbers(1 to 12). Let's me know what you think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448235937


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java:
##########
@@ -39,6 +39,8 @@ static ReadLimit minRows(long rows, long maxTriggerDelayMs) {
 
   static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }
 
+  static ReadLimit maxBytes(int bytes) { return new ReadMaxBytes(bytes); }

Review Comment:
   fixed



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan files which total
+ * size doesn't go beyond a given maximum total size. Always reads at least one file so a stream
+ * can make progress in case of a file larger than a given maximum.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 4.0.0
+ */
+@Evolving
+public class ReadMaxBytes implements ReadLimit {
+  private int bytes;

Review Comment:
   fixed



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1886528388

   I extracted all reusable code in test to test helper methods. I added prefix `SPARK-46641` to all maxBytesPerTrigger related tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1480827987


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   @viirya 
   I've converted BigInt to Double and now it is a comparison of two Doubles
   let me know if that is acceptable, we could also do:
   ```
   if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
   ```
   BigInt on the left would be converted to BigDecimal implicitly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1935440058

   Merged to master for Apache Spark 4.0.0.
   
   Congratulations for your first commit, @MaxNevermind .
   
   I added you to the Apache Spark contributor group and assigned SPARK-46641 to you, @MaxNevermind .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1447955436


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
+    Seq(
+      "maxFilesPerTrigger",
+      "maxBytesPerTrigger"
+    ).foreach{ thresholdOption =>
+      withTempDir { case src =>
+        var lastFileModTime: Option[Long] = None
+
+        /** Create a text file with a single data item */
+        def createFile(data: String): File = {
+          val file = stringToFile(new File(src, s"$data.txt"), data)
+          if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+          lastFileModTime = Some(file.lastModified)
+          file
+        }
 
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
-      }
+        createFile("a")
+        createFile("b")
+        createFile("c")
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+        // Set up a query to read text files 2 at a time
+        val df = spark
+          .readStream
+          .option(thresholdOption, 2)
+          .text(src.getCanonicalPath)
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName("file_data")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+        q.processAllAvailable()
+        val memorySink = q.sink.asInstanceOf[MemorySink]
+        val fileSource = getSourcesFromStreamingQuery(q).head
+
+        /** Check the data read in the last batch */
+        def checkLastBatchData(data: Char*): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      // Set up a query to read text files 2 at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 2)
-        .text(src.getCanonicalPath)
-      val q = df
-        .writeStream
-        .format("memory")
-        .queryName("file_data")
-        .start()
-        .asInstanceOf[StreamingQueryWrapper]
-        .streamingQuery
-      q.processAllAvailable()
-      val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = getSourcesFromStreamingQuery(q).head
-
-      /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        def checkAllData(data: Seq[Char]): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.allData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      def checkAllData(data: Seq[Int]): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.allData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        /** Check how many batches have executed since the last time this check was made */
+        var lastBatchId = -1L
+        def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
+          require(lastBatchId >= 0)
+          assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+          lastBatchId = memorySink.latestBatchId.get
+        }
 
-      /** Check how many batches have executed since the last time this check was made */
-      var lastBatchId = -1L
-      def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
-        require(lastBatchId >= 0)
-        assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+        checkLastBatchData('c')  // (a and b) should be in batch 1, (c) should be in batch 2 (last)
+        checkAllData('a' to 'c')
         lastBatchId = memorySink.latestBatchId.get
-      }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
-      checkAllData(1 to 3)
-      lastBatchId = memorySink.latestBatchId.get
+        fileSource.withBatchingLocked {
+          createFile("d")
+          createFile("e")   // d and e should be in a batch
+          createFile("f")
+          createFile("g")   // f and g should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(2)
+        checkLastBatchData('f', 'g')
+        checkAllData('a' to 'g')
+
+        fileSource.withBatchingLocked {
+          createFile("h")
+          createFile("i")    // h and i should be in a batch
+          createFile("j")
+          createFile("k")   // j and k should be in a batch
+          createFile("l")   // l should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(3)
+        checkLastBatchData('l')
+        checkAllData('a' to 'l')
 
-      fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
-      }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
-
-      fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        q.stop()
       }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
-
-      q.stop()
     }
   }
 
-  testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
-    withTable(testTable) {
-      withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-          val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
-            val q = df.writeStream.format("memory").queryName(testTable).start()
-            try {
-              q.processAllAvailable()
-            } finally {
-              q.stop()
+  testQuietly("max bytes per trigger & max files per trigger - incorrect values") {
+    Seq(
+      ("maxBytesPerTrigger_test", "maxBytesPerTrigger"),
+      ("maxFilesPerTrigger_test", "maxFilesPerTrigger")
+    ).foreach { case (testTable, optionName) =>
+      withTable(testTable) {
+        withTempDir { case src =>
+          def testMaxFilePerTriggerValue(value: String): Unit = {

Review Comment:
   Maybe `testMaxFileOrBytesPerTriggerValue`? Or `testMaxOptionPerTriggerValue`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448095648


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan files which total
+ * size doesn't go beyond a given maximum total size. Always reads at least one file so a stream

Review Comment:
   The following description is very important. Please document this behavior at `docs/structured-streaming-programming-guide.md`. Otherwise, this could mislead the users.
   > Always reads at least one file so a stream can make progress in case of a file larger than a given maximum.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1925504185

   @viirya @dongjoon-hyun 
   For PR to be merged is it supposed to reach a certain count of approves? Just trying to understand the next steps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448084408


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan files which total
+ * size doesn't go beyond a given maximum total size. Always reads at least one file so a stream
+ * can make progress in case of a file larger than a given maximum.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 4.0.0
+ */
+@Evolving
+public class ReadMaxBytes implements ReadLimit {
+  private int bytes;

Review Comment:
   ditto. `int` looks too small.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448086330


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.

Review Comment:
   nit. An extra space before the word, `Split`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448099879


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -379,6 +418,9 @@ object FileStreamSource {
     def sparkPath: SparkPath = SparkPath.fromUrlString(path)
   }
 
+  /** Newly fetched files metadata holder. */
+  private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long)

Review Comment:
   The file size seems to be already `Long` which is greater than `Int`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448531210


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   Yes, of course, we had better have a safe code from the beginning for the future. We can check that before doing `totalSize += size`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448084120


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java:
##########
@@ -39,6 +39,8 @@ static ReadLimit minRows(long rows, long maxTriggerDelayMs) {
 
   static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }
 
+  static ReadLimit maxBytes(int bytes) { return new ReadMaxBytes(bytes); }

Review Comment:
   Ur, why this is `int` instead of `long`? 2GB limit looks too small to me. Is there any reason, @MaxNevermind ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1470707325


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1154,25 +1154,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
   }
 
   test("max files per trigger") {
+    testThresholdLogic("maxFilesPerTrigger")
+  }
+
+  test("SPARK-46641: max bytes per trigger") {
+    testThresholdLogic("maxBytesPerTrigger")
+  }
+
+  private def testThresholdLogic(option: String): Unit = {
     withTempDir { case src =>
       var lastFileModTime: Option[Long] = None
 
       /** Create a text file with a single data item */

Review Comment:
   Could you add this backward-compatible method?
   ```scala
         def createFile(data: Int): File = {
           createFile(data.toString)
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1470755738


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   I think `DISCARD_UNSEEN_FILES_RATIO` is designed for max files `ReadMaxFiles`. I'm not sure if it makes sense to directly apply it for `ReadMaxBytes`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1480827987


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   @viirya 
   I've converted BigInt to Double and now it is a comparison of two Doubles
   let me know if that is acceptable, we could also do:
   ```
   if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
   ```
   BigInt on the left would be converted to BigDecimal implicitly by compiler



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   @viirya 
   I've converted BigInt to Double and now it is a comparison of two Doubles
   let me know if that is acceptable, we could also do:
   ```
   if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
   ```
   BigInt on the left would be converted to BigDecimal implicitly by a compiler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1480266624


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   To be clear, If `rSize` is a value larger than `Lonx.MaxValue`, once you did `rSize.toLong`, it may possibly cast to a long value truncated to 1, for example. So the condition `rSize.toLong < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong` will wrongly evaluate to `true`, although `rSize` is actually larger than `files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong`.
   
   To compare two values in different ranges, it seems more reasonable to compare them in wider range, instead of narrow one (as you will truncate the value from wider range).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1930385784

   Replied the comment. I also replied one issue on truncating `BigInt` value when comparing it with a long: https://github.com/apache/spark/pull/44636#discussion_r1470767776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1470694281


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +118,42 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   * Split files into a selected/unselected pair according to a total size threshold.
+   * Always puts the 1st element in a left split and keep adding it to a left split
+   * until reaches a specified threshold or [[Long.MaxValue]].
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+  (FilesSplit, FilesSplit) = {

Review Comment:
   FYI, Apache Spark community follows the following Scala guideline.
   - https://spark.apache.org/contributing.html
   > For Scala code, Apache Spark follows the official [Scala style guide](http://docs.scala-lang.org/style/) and [Databricks Scala guide](https://github.com/databricks/scala-style-guide). The latter is preferred. To format Scala code, run ./dev/scalafmt prior to submitting a PR.
   
   This is the guideline for this line.
   - https://github.com/databricks/scala-style-guide?tab=readme-ov-file#spacing-and-indentation
   > Return types can be either on the same line as the last parameter, or start a new line with 2 space indent.
   
   Please fix like this.
   ```scala
   - private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
   - (FilesSplit, FilesSplit) = {
   + private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long)
   +   : (FilesSplit, FilesSplit) = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448545607


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1323,6 +1374,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         q2.stop()
       }
     }
+

Review Comment:
   ditto. This extra empty line at the end of method is not an Apache Spark coding-style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448102251


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
+    Seq(
+      "maxFilesPerTrigger",
+      "maxBytesPerTrigger"
+    ).foreach{ thresholdOption =>
+      withTempDir { case src =>
+        var lastFileModTime: Option[Long] = None
+
+        /** Create a text file with a single data item */
+        def createFile(data: String): File = {
+          val file = stringToFile(new File(src, s"$data.txt"), data)
+          if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+          lastFileModTime = Some(file.lastModified)
+          file
+        }
 
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
-      }
+        createFile("a")
+        createFile("b")
+        createFile("c")
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+        // Set up a query to read text files 2 at a time
+        val df = spark
+          .readStream
+          .option(thresholdOption, 2)
+          .text(src.getCanonicalPath)
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName("file_data")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+        q.processAllAvailable()
+        val memorySink = q.sink.asInstanceOf[MemorySink]
+        val fileSource = getSourcesFromStreamingQuery(q).head
+
+        /** Check the data read in the last batch */
+        def checkLastBatchData(data: Char*): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      // Set up a query to read text files 2 at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 2)
-        .text(src.getCanonicalPath)
-      val q = df
-        .writeStream
-        .format("memory")
-        .queryName("file_data")
-        .start()
-        .asInstanceOf[StreamingQueryWrapper]
-        .streamingQuery
-      q.processAllAvailable()
-      val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = getSourcesFromStreamingQuery(q).head
-
-      /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        def checkAllData(data: Seq[Char]): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.allData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      def checkAllData(data: Seq[Int]): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.allData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        /** Check how many batches have executed since the last time this check was made */
+        var lastBatchId = -1L
+        def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
+          require(lastBatchId >= 0)
+          assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+          lastBatchId = memorySink.latestBatchId.get
+        }
 
-      /** Check how many batches have executed since the last time this check was made */
-      var lastBatchId = -1L
-      def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
-        require(lastBatchId >= 0)
-        assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+        checkLastBatchData('c')  // (a and b) should be in batch 1, (c) should be in batch 2 (last)
+        checkAllData('a' to 'c')
         lastBatchId = memorySink.latestBatchId.get
-      }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
-      checkAllData(1 to 3)
-      lastBatchId = memorySink.latestBatchId.get
+        fileSource.withBatchingLocked {
+          createFile("d")
+          createFile("e")   // d and e should be in a batch
+          createFile("f")
+          createFile("g")   // f and g should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(2)
+        checkLastBatchData('f', 'g')
+        checkAllData('a' to 'g')
+
+        fileSource.withBatchingLocked {
+          createFile("h")
+          createFile("i")    // h and i should be in a batch
+          createFile("j")
+          createFile("k")   // j and k should be in a batch
+          createFile("l")   // l should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(3)
+        checkLastBatchData('l')
+        checkAllData('a' to 'l')
 
-      fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
-      }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
-
-      fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        q.stop()
       }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
-
-      q.stop()
     }
   }
 
-  testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
-    withTable(testTable) {
-      withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-          val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
-            val q = df.writeStream.format("memory").queryName(testTable).start()
-            try {
-              q.processAllAvailable()
-            } finally {
-              q.stop()
+  testQuietly("max bytes per trigger & max files per trigger - incorrect values") {
+    Seq(
+      ("maxBytesPerTrigger_test", "maxBytesPerTrigger"),
+      ("maxFilesPerTrigger_test", "maxFilesPerTrigger")
+    ).foreach { case (testTable, optionName) =>
+      withTable(testTable) {
+        withTempDir { case src =>
+          def testMaxFilePerTriggerValue(value: String): Unit = {
+            val df = spark.readStream.option(optionName, value).text(src.getCanonicalPath)
+            val e = intercept[StreamingQueryException] {
+              // Note: a tested option is checked in the stream thread when creating the source
+              val q = df.writeStream.format("memory").queryName(testTable).start()
+              try {
+                q.processAllAvailable()
+              } finally {
+                q.stop()
+              }
+            }
+            assert(e.getCause.isInstanceOf[IllegalArgumentException])
+            Seq(optionName, value, "positive integer").foreach { s =>
+              assert(e.getMessage.contains(s))
             }
           }
-          assert(e.getCause.isInstanceOf[IllegalArgumentException])
-          Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
-            assert(e.getMessage.contains(s))
-          }
-        }
 
-        testMaxFilePerTriggerValue("not-a-integer")
-        testMaxFilePerTriggerValue("-1")
-        testMaxFilePerTriggerValue("0")
-        testMaxFilePerTriggerValue("10.1")
+          testMaxFilePerTriggerValue("not-a-integer")
+          testMaxFilePerTriggerValue("-1")
+          testMaxFilePerTriggerValue("0")
+          testMaxFilePerTriggerValue("10.1")
+        }
       }
     }
   }
 
-  test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") {
-    withTempDirs { (src, target) =>
-      val checkpoint = new File(target, "chk").getCanonicalPath
-      val targetDir = new File(target, "data").getCanonicalPath
-      var lastFileModTime: Option[Long] = None
-
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
+  testQuietly("max bytes per trigger & max files per trigger - both set") {

Review Comment:
   This is valid because this is the negative case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1449773657


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala:
##########
@@ -50,11 +50,25 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
     }
   }
 
+  val maxBytesPerTrigger: Option[Long] = parameters.get("maxBytesPerTrigger").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).map(op =>

Review Comment:
   fixed



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1202,51 +1210,60 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
       /** Check how many batches have executed since the last time this check was made */
       var lastBatchId = -1L
+

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1882494537

   @liancheng 
   Please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448542354


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1202,51 +1210,60 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
       /** Check how many batches have executed since the last time this check was made */
       var lastBatchId = -1L
+

Review Comment:
   Please remove this extra style change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448087582


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   If we change the type of `maxSize` from `Int` to `Long`, line 141 and 142 should be revised in order to avoid overflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1882495506

   @viirya 
   Please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1935438989

   Hi, @MaxNevermind . It was a comment for @viirya .
   
   Let me merge this. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1931178064

   Pushed another commit. One issue was resolved.
   @viirya 
   please check out a solution for the last remaining proposed issue above
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1470701871


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   `.toLong` is required here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1925533439

   No, just want to confirm if there is no actin point to me here right now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1445690700


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -561,6 +561,8 @@ Here are the details of all the sources in Spark.
         <br/>
         <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
         <br/>
+        <code>maxBytesPerTrigger</code>: maximum total size of new files to be considered in every trigger (default: no max). maxBytesPerTrigger and maxFilesPerTrigger can't both be set at the same time, only one of two must be choosen.

Review Comment:
   chosen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1446772037


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala:
##########
@@ -50,11 +50,25 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
     }
   }
 
+  val maxBytesPerTrigger: Option[Int] = parameters.get("maxBytesPerTrigger").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).map(op =>
+      if (maxFilesPerTrigger.nonEmpty) {
+        throw new IllegalArgumentException(
+          s"Options 'maxFilesPerTrigger' and 'maxBytesPerTrigger' " +
+            s"can't be both set at the same time")

Review Comment:
   fixed



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -561,6 +561,8 @@ Here are the details of all the sources in Spark.
         <br/>
         <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
         <br/>
+        <code>maxBytesPerTrigger</code>: maximum total size of new files to be considered in every trigger (default: no max). maxBytesPerTrigger and maxFilesPerTrigger can't both be set at the same time, only one of two must be choosen.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1885273239

   @viirya 
   Fixed the issues, tests are passing now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448525529


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala:
##########
@@ -50,11 +50,25 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
     }
   }
 
+  val maxBytesPerTrigger: Option[Long] = parameters.get("maxBytesPerTrigger").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).map(op =>

Review Comment:
   `str.toInt` -> `str.toLong`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448545475


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1300,6 +1350,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
           .option("checkpointLocation", checkpoint)
           .start(targetDir)
       }
+

Review Comment:
   ditto. Please remove this extra empty line addition.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1323,6 +1374,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         q2.stop()
       }
     }
+

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1886702847

   Thank you for update, @MaxNevermind . It looks much better. I left the second round review comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448132991


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadLimit.java:
##########
@@ -39,6 +39,8 @@ static ReadLimit minRows(long rows, long maxTriggerDelayMs) {
 
   static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }
 
+  static ReadLimit maxBytes(int bytes) { return new ReadMaxBytes(bytes); }

Review Comment:
   Good catch. `long` looks more proper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448235720


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
+    Seq(
+      "maxFilesPerTrigger",
+      "maxBytesPerTrigger"
+    ).foreach{ thresholdOption =>
+      withTempDir { case src =>
+        var lastFileModTime: Option[Long] = None
+
+        /** Create a text file with a single data item */
+        def createFile(data: String): File = {
+          val file = stringToFile(new File(src, s"$data.txt"), data)
+          if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+          lastFileModTime = Some(file.lastModified)
+          file
+        }
 
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
-      }
+        createFile("a")
+        createFile("b")
+        createFile("c")
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+        // Set up a query to read text files 2 at a time
+        val df = spark
+          .readStream
+          .option(thresholdOption, 2)
+          .text(src.getCanonicalPath)
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName("file_data")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+        q.processAllAvailable()
+        val memorySink = q.sink.asInstanceOf[MemorySink]
+        val fileSource = getSourcesFromStreamingQuery(q).head
+
+        /** Check the data read in the last batch */
+        def checkLastBatchData(data: Char*): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      // Set up a query to read text files 2 at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 2)
-        .text(src.getCanonicalPath)
-      val q = df
-        .writeStream
-        .format("memory")
-        .queryName("file_data")
-        .start()
-        .asInstanceOf[StreamingQueryWrapper]
-        .streamingQuery
-      q.processAllAvailable()
-      val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = getSourcesFromStreamingQuery(q).head
-
-      /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        def checkAllData(data: Seq[Char]): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.allData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      def checkAllData(data: Seq[Int]): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.allData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        /** Check how many batches have executed since the last time this check was made */
+        var lastBatchId = -1L
+        def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
+          require(lastBatchId >= 0)
+          assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+          lastBatchId = memorySink.latestBatchId.get
+        }
 
-      /** Check how many batches have executed since the last time this check was made */
-      var lastBatchId = -1L
-      def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
-        require(lastBatchId >= 0)
-        assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+        checkLastBatchData('c')  // (a and b) should be in batch 1, (c) should be in batch 2 (last)
+        checkAllData('a' to 'c')
         lastBatchId = memorySink.latestBatchId.get
-      }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
-      checkAllData(1 to 3)
-      lastBatchId = memorySink.latestBatchId.get
+        fileSource.withBatchingLocked {
+          createFile("d")
+          createFile("e")   // d and e should be in a batch
+          createFile("f")
+          createFile("g")   // f and g should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(2)
+        checkLastBatchData('f', 'g')
+        checkAllData('a' to 'g')
+
+        fileSource.withBatchingLocked {
+          createFile("h")
+          createFile("i")    // h and i should be in a batch
+          createFile("j")
+          createFile("k")   // j and k should be in a batch
+          createFile("l")   // l should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(3)
+        checkLastBatchData('l')
+        checkAllData('a' to 'l')
 
-      fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
-      }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
-
-      fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        q.stop()
       }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
-
-      q.stop()
     }
   }
 
-  testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
-    withTable(testTable) {
-      withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-          val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
-            val q = df.writeStream.format("memory").queryName(testTable).start()
-            try {
-              q.processAllAvailable()
-            } finally {
-              q.stop()
+  testQuietly("max bytes per trigger & max files per trigger - incorrect values") {
+    Seq(
+      ("maxBytesPerTrigger_test", "maxBytesPerTrigger"),
+      ("maxFilesPerTrigger_test", "maxFilesPerTrigger")
+    ).foreach { case (testTable, optionName) =>
+      withTable(testTable) {
+        withTempDir { case src =>
+          def testMaxFilePerTriggerValue(value: String): Unit = {

Review Comment:
   fixed



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -561,6 +561,8 @@ Here are the details of all the sources in Spark.
         <br/>
         <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
         <br/>
+        <code>maxBytesPerTrigger</code>: maximum total size of new files to be considered in every trigger (default: no max). maxBytesPerTrigger and maxFilesPerTrigger can't both be set at the same time, only one of two must be chosen.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448240399


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   As I understand overflow might happen if a sum of input file sizes(in bytes) will be bigger than Long.MaxValue in bytes which is ~ 9 000 PB. Should we consider that a possibility?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1449773345


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
+    Seq(
+      "maxFilesPerTrigger",
+      "maxBytesPerTrigger"
+    ).foreach{ thresholdOption =>
+      withTempDir { case src =>
+        var lastFileModTime: Option[Long] = None
+
+        /** Create a text file with a single data item */
+        def createFile(data: String): File = {
+          val file = stringToFile(new File(src, s"$data.txt"), data)
+          if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+          lastFileModTime = Some(file.lastModified)
+          file
+        }
 
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
-      }
+        createFile("a")
+        createFile("b")
+        createFile("c")
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+        // Set up a query to read text files 2 at a time
+        val df = spark
+          .readStream
+          .option(thresholdOption, 2)
+          .text(src.getCanonicalPath)
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName("file_data")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+        q.processAllAvailable()
+        val memorySink = q.sink.asInstanceOf[MemorySink]
+        val fileSource = getSourcesFromStreamingQuery(q).head
+
+        /** Check the data read in the last batch */
+        def checkLastBatchData(data: Char*): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      // Set up a query to read text files 2 at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 2)
-        .text(src.getCanonicalPath)
-      val q = df
-        .writeStream
-        .format("memory")
-        .queryName("file_data")
-        .start()
-        .asInstanceOf[StreamingQueryWrapper]
-        .streamingQuery
-      q.processAllAvailable()
-      val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = getSourcesFromStreamingQuery(q).head
-
-      /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        def checkAllData(data: Seq[Char]): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.allData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      def checkAllData(data: Seq[Int]): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.allData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        /** Check how many batches have executed since the last time this check was made */
+        var lastBatchId = -1L
+        def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
+          require(lastBatchId >= 0)
+          assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+          lastBatchId = memorySink.latestBatchId.get
+        }
 
-      /** Check how many batches have executed since the last time this check was made */
-      var lastBatchId = -1L
-      def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
-        require(lastBatchId >= 0)
-        assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+        checkLastBatchData('c')  // (a and b) should be in batch 1, (c) should be in batch 2 (last)
+        checkAllData('a' to 'c')
         lastBatchId = memorySink.latestBatchId.get
-      }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
-      checkAllData(1 to 3)
-      lastBatchId = memorySink.latestBatchId.get
+        fileSource.withBatchingLocked {
+          createFile("d")
+          createFile("e")   // d and e should be in a batch
+          createFile("f")
+          createFile("g")   // f and g should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(2)
+        checkLastBatchData('f', 'g')
+        checkAllData('a' to 'g')
+
+        fileSource.withBatchingLocked {
+          createFile("h")
+          createFile("i")    // h and i should be in a batch
+          createFile("j")
+          createFile("k")   // j and k should be in a batch
+          createFile("l")   // l should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(3)
+        checkLastBatchData('l')
+        checkAllData('a' to 'l')
 
-      fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
-      }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
-
-      fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        q.stop()
       }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
-
-      q.stop()
     }
   }
 
-  testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
-    withTable(testTable) {
-      withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-          val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
-            val q = df.writeStream.format("memory").queryName(testTable).start()
-            try {
-              q.processAllAvailable()
-            } finally {
-              q.stop()
+  testQuietly("max bytes per trigger & max files per trigger - incorrect values") {

Review Comment:
   fixed



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
+    Seq(
+      "maxFilesPerTrigger",
+      "maxBytesPerTrigger"
+    ).foreach{ thresholdOption =>
+      withTempDir { case src =>
+        var lastFileModTime: Option[Long] = None
+
+        /** Create a text file with a single data item */
+        def createFile(data: String): File = {
+          val file = stringToFile(new File(src, s"$data.txt"), data)
+          if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+          lastFileModTime = Some(file.lastModified)
+          file
+        }
 
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
-      }
+        createFile("a")
+        createFile("b")
+        createFile("c")
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+        // Set up a query to read text files 2 at a time
+        val df = spark
+          .readStream
+          .option(thresholdOption, 2)
+          .text(src.getCanonicalPath)
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName("file_data")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+        q.processAllAvailable()
+        val memorySink = q.sink.asInstanceOf[MemorySink]
+        val fileSource = getSourcesFromStreamingQuery(q).head
+
+        /** Check the data read in the last batch */
+        def checkLastBatchData(data: Char*): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      // Set up a query to read text files 2 at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 2)
-        .text(src.getCanonicalPath)
-      val q = df
-        .writeStream
-        .format("memory")
-        .queryName("file_data")
-        .start()
-        .asInstanceOf[StreamingQueryWrapper]
-        .streamingQuery
-      q.processAllAvailable()
-      val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = getSourcesFromStreamingQuery(q).head
-
-      /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        def checkAllData(data: Seq[Char]): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.allData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      def checkAllData(data: Seq[Int]): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.allData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        /** Check how many batches have executed since the last time this check was made */
+        var lastBatchId = -1L
+        def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
+          require(lastBatchId >= 0)
+          assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+          lastBatchId = memorySink.latestBatchId.get
+        }
 
-      /** Check how many batches have executed since the last time this check was made */
-      var lastBatchId = -1L
-      def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
-        require(lastBatchId >= 0)
-        assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+        checkLastBatchData('c')  // (a and b) should be in batch 1, (c) should be in batch 2 (last)
+        checkAllData('a' to 'c')
         lastBatchId = memorySink.latestBatchId.get
-      }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
-      checkAllData(1 to 3)
-      lastBatchId = memorySink.latestBatchId.get
+        fileSource.withBatchingLocked {
+          createFile("d")
+          createFile("e")   // d and e should be in a batch
+          createFile("f")
+          createFile("g")   // f and g should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(2)
+        checkLastBatchData('f', 'g')
+        checkAllData('a' to 'g')
+
+        fileSource.withBatchingLocked {
+          createFile("h")
+          createFile("i")    // h and i should be in a batch
+          createFile("j")
+          createFile("k")   // j and k should be in a batch
+          createFile("l")   // l should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(3)
+        checkLastBatchData('l')
+        checkAllData('a' to 'l')
 
-      fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
-      }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
-
-      fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        q.stop()
       }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
-
-      q.stop()
     }
   }
 
-  testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
-    withTable(testTable) {
-      withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-          val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
-            val q = df.writeStream.format("memory").queryName(testTable).start()
-            try {
-              q.processAllAvailable()
-            } finally {
-              q.stop()
+  testQuietly("max bytes per trigger & max files per trigger - incorrect values") {
+    Seq(
+      ("maxBytesPerTrigger_test", "maxBytesPerTrigger"),
+      ("maxFilesPerTrigger_test", "maxFilesPerTrigger")
+    ).foreach { case (testTable, optionName) =>
+      withTable(testTable) {
+        withTempDir { case src =>
+          def testMaxFilePerTriggerValue(value: String): Unit = {
+            val df = spark.readStream.option(optionName, value).text(src.getCanonicalPath)
+            val e = intercept[StreamingQueryException] {
+              // Note: a tested option is checked in the stream thread when creating the source
+              val q = df.writeStream.format("memory").queryName(testTable).start()
+              try {
+                q.processAllAvailable()
+              } finally {
+                q.stop()
+              }
+            }
+            assert(e.getCause.isInstanceOf[IllegalArgumentException])
+            Seq(optionName, value, "positive integer").foreach { s =>
+              assert(e.getMessage.contains(s))
             }
           }
-          assert(e.getCause.isInstanceOf[IllegalArgumentException])
-          Seq("maxFilesPerTrigger", value, "positive integer").foreach { s =>
-            assert(e.getMessage.contains(s))
-          }
-        }
 
-        testMaxFilePerTriggerValue("not-a-integer")
-        testMaxFilePerTriggerValue("-1")
-        testMaxFilePerTriggerValue("0")
-        testMaxFilePerTriggerValue("10.1")
+          testMaxFilePerTriggerValue("not-a-integer")
+          testMaxFilePerTriggerValue("-1")
+          testMaxFilePerTriggerValue("0")
+          testMaxFilePerTriggerValue("10.1")
+        }
       }
     }
   }
 
-  test("SPARK-30669: maxFilesPerTrigger - ignored when using Trigger.Once") {
-    withTempDirs { (src, target) =>
-      val checkpoint = new File(target, "chk").getCanonicalPath
-      val targetDir = new File(target, "data").getCanonicalPath
-      var lastFileModTime: Option[Long] = None
-
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
+  testQuietly("max bytes per trigger & max files per trigger - both set") {
+    val testTable = "maxBytesPerTrigger_maxFilesPerTrigger_test"
+    withTable(testTable) {
+      withTempDir { case src =>
+        val df = spark.readStream
+          .option("maxBytesPerTrigger", "1")
+          .option("maxFilesPerTrigger", "1")
+          .text(src.getCanonicalPath)
+        val e = intercept[StreamingQueryException] {
+          // Note: a tested option is checked in the stream thread when creating the source
+          val q = df.writeStream.format("memory").queryName(testTable).start()
+          try {
+            q.processAllAvailable()
+          } finally {
+            q.stop()
+          }
+        }
+        assert(e.getCause.isInstanceOf[IllegalArgumentException])
+        Seq("maxBytesPerTrigger", "maxFilesPerTrigger", "can't be both set").foreach { s =>
+          assert(e.getMessage.contains(s))
+        }
       }
+    }
+  }
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
-
-      // Set up a query to read text files one at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 1)
-        .text(src.getCanonicalPath)
+  test("SPARK-30669: maxFilesPerTrigger & maxBytesPerTrigger - ignored when using Trigger.Once") {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1470767776


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   Hmm, if `BigInt` is used to avoid overflow, does it make sense to cast it to `Long` to compare here? `rSize` might be larger than long range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1480254474


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   I think we can rename it to `DISCARD_UNSEEN_INPUT_RATIO` for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1452773164


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   This is better than the silent overflow. However, this looks insufficient to me because this new code enforces Apache Spark community to add a new migration documentation about this kind of new regression .
   
   Although I understand why you think the overflow is unlikely and are reluctant to change the logic, the Apache Spark community shares this code among the world-wide users. We cannot merge this kind of regression. 
   
   To simply put, when we able to handle handle this gracefully without any exception and regression, why do we choose this logic and expose the users the risk of production job failures due to `ArithmeticException`, @MaxNevermind ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1480258634


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +197,25 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+          takeFilesUntilMax(newFiles, files.maxBytes())
+        if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   I don't mean there will be truncation on `files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong`, but `rSize.toLong`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1449776458


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -166,6 +186,23 @@ class FileStreamSource(
         // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
         (newFiles.take(files.maxFiles()), null)
 
+      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+        // we can cache and reuse remaining fetched list of files in further batches
+        val (bFiles, usFiles) = takeFilesUntilMax(newFiles, files.maxBytes())
+        if (usFiles.map(_.size).sum < files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO) {

Review Comment:
   I introduced `usFilesSize` that computed using `Math.addExact`. Let me know if that works. The reason I would like to leave this this way without modifying `takeFilesUntilMax` is because I want to preserve the model of `val (bFiles, usFiles) = ...` computation similar to existing `ReadMaxFiles` few lines above as it is pretty clean and simple to understand and reason about. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1886147894

   Addressed some of the issues, will work on the tests a little bit later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1449769428


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   I used Math.addExact to deal with overflow here in another place. Let's me know if that approach is acceptable. I also found Spark's MathUtils. Not 100% certain which way is the better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1888393009

   @dongjoon-hyun 
   I've added more fixes and resolved related conversations but there are still some open questions. Please read through my comments, explanations and questions and let me know if we can proceed with current solution or I need to tweak it more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448082584


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -561,6 +561,8 @@ Here are the details of all the sources in Spark.
         <br/>
         <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
         <br/>
+        <code>maxBytesPerTrigger</code>: maximum total size of new files to be considered in every trigger (default: no max). maxBytesPerTrigger and maxFilesPerTrigger can't both be set at the same time, only one of two must be chosen.

Review Comment:
   nit, codify?
   ```
   - maxBytesPerTrigger and maxFilesPerTrigger can't both
   + <code>maxBytesPerTrigger</code> and <code>maxFilesPerTrigger</code> can't both
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1445691484


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala:
##########
@@ -50,11 +50,25 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
     }
   }
 
+  val maxBytesPerTrigger: Option[Int] = parameters.get("maxBytesPerTrigger").map { str =>
+    Try(str.toInt).toOption.filter(_ > 0).map(op =>
+      if (maxFilesPerTrigger.nonEmpty) {
+        throw new IllegalArgumentException(
+          s"Options 'maxFilesPerTrigger' and 'maxBytesPerTrigger' " +
+            s"can't be both set at the same time")

Review Comment:
   ```suggestion
             "Options 'maxFilesPerTrigger' and 'maxBytesPerTrigger' " +
               "can't be both set at the same time")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1452773164


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   This is better than the silent overflow. However, this looks insufficient to me because this new code enforces Apache Spark community to add a new migration documentation about this kind of new regression .
   
   Although I understand why you think the overflow is unlikely and are reluctant to change the logic, the Apache Spark community shares this code among the world-wide users. We cannot merge this kind of regression. 
   
   To simply put, when we able to handle this new feature gracefully without any exception and regression, why do we choose this logic and expose the users the risk of production job failures due to `ArithmeticException`, @MaxNevermind ? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   This is better than the silent overflow. However, this looks insufficient to me because this new code enforces Apache Spark community to add a new migration documentation about this kind of new regression .
   
   Although I understand why you think the overflow is unlikely and are reluctant to change the logic, the Apache Spark community shares this code among the world-wide users. We cannot merge this kind of regression. 
   
   To simply put, when we able to handle this new feature gracefully without any exception and regression, why do we choose the AS-IS PR's logic and expose the users the risk of production job failures due to `ArithmeticException`, @MaxNevermind ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1470707325


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1154,25 +1154,33 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
   }
 
   test("max files per trigger") {
+    testThresholdLogic("maxFilesPerTrigger")
+  }
+
+  test("SPARK-46641: max bytes per trigger") {
+    testThresholdLogic("maxBytesPerTrigger")
+  }
+
+  private def testThresholdLogic(option: String): Unit = {
     withTempDir { case src =>
       var lastFileModTime: Option[Long] = None
 
       /** Create a text file with a single data item */

Review Comment:
   Could you add this backward-compatible method?
   ```scala
         def createFile(data: Int): File = {
           createFile(data.toString)
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1473284940


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +118,42 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   * Split files into a selected/unselected pair according to a total size threshold.
+   * Always puts the 1st element in a left split and keep adding it to a left split
+   * until reaches a specified threshold or [[Long.MaxValue]].
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+  (FilesSplit, FilesSplit) = {

Review Comment:
   Yes,  Rollback on the unnecessary change is the recommended way. Only new code should follow the coding style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1925504531

   To @MaxNevermind , this is still under review technically because the previous approval is not for the last commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1925529091

   @dongjoon-hyun 
   Should I re-request reviews?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1452773164


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   This is better than the silent overflow. However, this looks insufficient to me because this new code enforces Apache Spark community to add a new migration documentation about this kind of new regression .
   
   Although I understand why you think the overflow is unlikely and reluctant to change the logic, the Apache Spark community shares this code among the world-wide users. We cannot merge this kind of regression. 
   
   To simply put, when we able to handle handle this gracefully without any exception and regression, why do we choose this logic and expose the users the risk of production job failures due to `ArithmeticException`, @MaxNevermind ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1903098902

   @dongjoon-hyun 
   Could you please review again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1470687917


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -96,7 +100,8 @@ class FileStreamSource(
       implicitly[Ordering[Long]]
     }
 
-  private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) {
+  private val maxFileAgeMs: Long = if (
+    sourceOptions.latestFirst && (maxFilesPerBatch.isDefined || maxBytesPerBatch.isDefined)) {

Review Comment:
   nit.
   ```scala
   - private val maxFileAgeMs: Long = if (
   -   sourceOptions.latestFirst && (maxFilesPerBatch.isDefined || maxBytesPerBatch.isDefined)) {
   + private val maxFileAgeMs: Long = if (sourceOptions.latestFirst &&
   +     (maxFilesPerBatch.isDefined || maxBytesPerBatch.isDefined)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1916206170

   Sorry for being late. I'm review now, @MaxNevermind .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1882502273

   @MaxNevermind Could you create a Jira ticket and replace `SPARK-XXXX` with Jira ticket number in the PR title? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-XXXX][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1883698927

   I think your Jira account was created now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1447960178


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1777,6 +1817,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+
+  test("SPARK-19813: Ignore maxFileAge when maxBytesPerTrigger and latestFirst is used") {

Review Comment:
   `maxFilesPerTrigger` was removed from the test title. We should keep it as before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1447960970


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1777,6 +1817,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+
+  test("SPARK-19813: Ignore maxFileAge when maxBytesPerTrigger and latestFirst is used") {

Review Comment:
   Oh, nvm, just found that this is a new test added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448101899


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -1153,174 +1153,213 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("max files per trigger") {
-    withTempDir { case src =>
-      var lastFileModTime: Option[Long] = None
+  test("maxFilesPerTrigger & maxBytesPerTrigger threshold logic must be obeyed") {
+    Seq(
+      "maxFilesPerTrigger",
+      "maxBytesPerTrigger"
+    ).foreach{ thresholdOption =>
+      withTempDir { case src =>
+        var lastFileModTime: Option[Long] = None
+
+        /** Create a text file with a single data item */
+        def createFile(data: String): File = {
+          val file = stringToFile(new File(src, s"$data.txt"), data)
+          if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
+          lastFileModTime = Some(file.lastModified)
+          file
+        }
 
-      /** Create a text file with a single data item */
-      def createFile(data: Int): File = {
-        val file = stringToFile(new File(src, s"$data.txt"), data.toString)
-        if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
-        lastFileModTime = Some(file.lastModified)
-        file
-      }
+        createFile("a")
+        createFile("b")
+        createFile("c")
 
-      createFile(1)
-      createFile(2)
-      createFile(3)
+        // Set up a query to read text files 2 at a time
+        val df = spark
+          .readStream
+          .option(thresholdOption, 2)
+          .text(src.getCanonicalPath)
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName("file_data")
+          .start()
+          .asInstanceOf[StreamingQueryWrapper]
+          .streamingQuery
+        q.processAllAvailable()
+        val memorySink = q.sink.asInstanceOf[MemorySink]
+        val fileSource = getSourcesFromStreamingQuery(q).head
+
+        /** Check the data read in the last batch */
+        def checkLastBatchData(data: Char*): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      // Set up a query to read text files 2 at a time
-      val df = spark
-        .readStream
-        .option("maxFilesPerTrigger", 2)
-        .text(src.getCanonicalPath)
-      val q = df
-        .writeStream
-        .format("memory")
-        .queryName("file_data")
-        .start()
-        .asInstanceOf[StreamingQueryWrapper]
-        .streamingQuery
-      q.processAllAvailable()
-      val memorySink = q.sink.asInstanceOf[MemorySink]
-      val fileSource = getSourcesFromStreamingQuery(q).head
-
-      /** Check the data read in the last batch */
-      def checkLastBatchData(data: Int*): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.latestBatchData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        def checkAllData(data: Seq[Char]): Unit = {
+          val schema = StructType(Seq(StructField("value", StringType)))
+          val df = spark.createDataFrame(
+            spark.sparkContext.makeRDD(memorySink.allData), schema)
+          checkAnswer(df, data.map(_.toString).toDF("value"))
+        }
 
-      def checkAllData(data: Seq[Int]): Unit = {
-        val schema = StructType(Seq(StructField("value", StringType)))
-        val df = spark.createDataFrame(
-          spark.sparkContext.makeRDD(memorySink.allData), schema)
-        checkAnswer(df, data.map(_.toString).toDF("value"))
-      }
+        /** Check how many batches have executed since the last time this check was made */
+        var lastBatchId = -1L
+        def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
+          require(lastBatchId >= 0)
+          assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+          lastBatchId = memorySink.latestBatchId.get
+        }
 
-      /** Check how many batches have executed since the last time this check was made */
-      var lastBatchId = -1L
-      def checkNumBatchesSinceLastCheck(numBatches: Int): Unit = {
-        require(lastBatchId >= 0)
-        assert(memorySink.latestBatchId.get === lastBatchId + numBatches)
+        checkLastBatchData('c')  // (a and b) should be in batch 1, (c) should be in batch 2 (last)
+        checkAllData('a' to 'c')
         lastBatchId = memorySink.latestBatchId.get
-      }
 
-      checkLastBatchData(3)  // (1 and 2) should be in batch 1, (3) should be in batch 2 (last)
-      checkAllData(1 to 3)
-      lastBatchId = memorySink.latestBatchId.get
+        fileSource.withBatchingLocked {
+          createFile("d")
+          createFile("e")   // d and e should be in a batch
+          createFile("f")
+          createFile("g")   // f and g should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(2)
+        checkLastBatchData('f', 'g')
+        checkAllData('a' to 'g')
+
+        fileSource.withBatchingLocked {
+          createFile("h")
+          createFile("i")    // h and i should be in a batch
+          createFile("j")
+          createFile("k")   // j and k should be in a batch
+          createFile("l")   // l should be in the last batch
+        }
+        q.processAllAvailable()
+        checkNumBatchesSinceLastCheck(3)
+        checkLastBatchData('l')
+        checkAllData('a' to 'l')
 
-      fileSource.withBatchingLocked {
-        createFile(4)
-        createFile(5)   // 4 and 5 should be in a batch
-        createFile(6)
-        createFile(7)   // 6 and 7 should be in the last batch
-      }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(2)
-      checkLastBatchData(6, 7)
-      checkAllData(1 to 7)
-
-      fileSource.withBatchingLocked {
-        createFile(8)
-        createFile(9)    // 8 and 9 should be in a batch
-        createFile(10)
-        createFile(11)   // 10 and 11 should be in a batch
-        createFile(12)   // 12 should be in the last batch
+        q.stop()
       }
-      q.processAllAvailable()
-      checkNumBatchesSinceLastCheck(3)
-      checkLastBatchData(12)
-      checkAllData(1 to 12)
-
-      q.stop()
     }
   }
 
-  testQuietly("max files per trigger - incorrect values") {
-    val testTable = "maxFilesPerTrigger_test"
-    withTable(testTable) {
-      withTempDir { case src =>
-        def testMaxFilePerTriggerValue(value: String): Unit = {
-          val df = spark.readStream.option("maxFilesPerTrigger", value).text(src.getCanonicalPath)
-          val e = intercept[StreamingQueryException] {
-            // Note: `maxFilesPerTrigger` is checked in the stream thread when creating the source
-            val q = df.writeStream.format("memory").queryName(testTable).start()
-            try {
-              q.processAllAvailable()
-            } finally {
-              q.stop()
+  testQuietly("max bytes per trigger & max files per trigger - incorrect values") {

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448236171


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMaxBytes.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan files which total
+ * size doesn't go beyond a given maximum total size. Always reads at least one file so a stream

Review Comment:
   fixed



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -379,6 +418,9 @@ object FileStreamSource {
     def sparkPath: SparkPath = SparkPath.fromUrlString(path)
   }
 
+  /** Newly fetched files metadata holder. */
+  private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long)

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448240399


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   As I understand overflow might happen if a sum of input file sizes(in bytes) will be bigger that Long.MaxValue in bytes which is ~ 9 000 PB. Should we consider that a possibility?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "MaxNevermind (via GitHub)" <gi...@apache.org>.
MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1448240399


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -113,16 +117,32 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _
 
   metadataLog.restore().foreach { entry =>
     seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
-  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+  logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
+    s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")
+
+  private var unreadFiles: Seq[NewFileEntry] = _
 
-  private var unreadFiles: Seq[(SparkPath, Long)] = _
+  /**
+   *  Split files into a selected/unselected pair according to a total size threshold.
+   */
+  private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long):
+    (Seq[NewFileEntry], Seq[NewFileEntry]) = {
+    var idx = 0
+    var totalSize = 0L
+    val (bFiles, usFiles) = files.span { case NewFileEntry(_, size, _) =>
+      idx += 1
+      totalSize += size
+      idx == 1 || totalSize <= maxSize

Review Comment:
   As I understand overflow might happen if a sum of input file sizes(in bytes) will be bigger that Long.MaxValue in bytes which is ~ 9PB. Should we consider that a possibility?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1936301000

   Thank you @MaxNevermind @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org