You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/08/20 01:43:28 UTC

[spark] branch branch-3.2 updated: [SPARK-35312][SS][FOLLOW-UP] More documents and checking logic for the new options

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 36c24a0  [SPARK-35312][SS][FOLLOW-UP] More documents and checking logic for the new options
36c24a0 is described below

commit 36c24a03bd6d00beb65377d37855544f406d1f18
Author: Yuanjian Li <yu...@databricks.com>
AuthorDate: Fri Aug 20 10:41:42 2021 +0900

    [SPARK-35312][SS][FOLLOW-UP] More documents and checking logic for the new options
    
    ### What changes were proposed in this pull request?
    Add more documents and checking logic for the new options `minOffsetPerTrigger` and `maxTriggerDelay`.
    
    ### Why are the changes needed?
    Have a clear description of the behavior introduced in SPARK-35312
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    If the user set minOffsetsPerTrigger > maxOffsetsPerTrigger, the new code will throw an AnalysisException. The original behavior is to ignore the maxOffsetsPerTrigger silenctly.
    
    ### How was this patch tested?
    Existing tests.
    
    Closes #33792 from xuanyuanking/SPARK-35312-follow.
    
    Authored-by: Yuanjian Li <yu...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
    (cherry picked from commit a0b24019edcd268968a7e0074b0a54988e408699)
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 docs/structured-streaming-kafka-integration.md            | 15 +++++++++------
 .../apache/spark/sql/kafka010/KafkaSourceProvider.scala   | 13 +++++++++++++
 .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala   |  4 ++++
 3 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 6926bbb..0ec359f 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -477,23 +477,26 @@ The following configurations are optional:
   <td>maxOffsetsPerTrigger</td>
   <td>long</td>
   <td>none</td>
-  <td>streaming and batch</td>
+  <td>streaming query</td>
   <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
 </tr>
 <tr>
   <td>minOffsetsPerTrigger</td>
   <td>long</td>
   <td>none</td>
-  <td>streaming and batch</td>
-  <td>Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will
-   be proportionally split across topicPartitions of different volume.</td>
+  <td>streaming query</td>
+  <td>Minimum number of offsets to be processed per trigger interval. The specified total number of
+  offsets will be proportionally split across topicPartitions of different volume. Note, if the
+  maxTriggerDelay is exceeded, a trigger will be fired even if the number of available offsets
+  doesn't reach minOffsetsPerTrigger.</td>
 </tr>
 <tr>
   <td>maxTriggerDelay</td>
   <td>time with units</td>
   <td>15m</td>
-  <td>streaming and batch</td>
-  <td>Maximum amount of time for which trigger can be delayed between two triggers provided some data is available from the source.</td>
+  <td>streaming query</td>
+  <td>Maximum amount of time for which trigger can be delayed between two triggers provided some
+  data is available from the source. This option is only applicable if minOffsetsPerTrigger is set.</td>
 </tr>
 <tr>
   <td>minPartitions</td>
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 38803b7..4a75ab0 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -322,6 +322,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
         s"Option 'kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}' must be specified for " +
           s"configuring Kafka consumer")
     }
+
+    if (params.contains(MIN_OFFSET_PER_TRIGGER) && params.contains(MAX_OFFSET_PER_TRIGGER)) {
+      val minOffsets = params.get(MIN_OFFSET_PER_TRIGGER).get.toLong
+      val maxOffsets = params.get(MAX_OFFSET_PER_TRIGGER).get.toLong
+      if (minOffsets > maxOffsets) {
+        throw new IllegalArgumentException(s"The value of minOffsetPerTrigger($minOffsets) is " +
+          s"higher than the maxOffsetsPerTrigger($maxOffsets).")
+      }
+    }
   }
 
   private def validateStreamOptions(params: CaseInsensitiveMap[String]) = {
@@ -382,6 +391,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
     if (params.contains(MIN_OFFSET_PER_TRIGGER)) {
       logWarning("minOffsetsPerTrigger option ignored in batch queries")
     }
+
+    if (params.contains(MAX_TRIGGER_DELAY)) {
+      logWarning("maxTriggerDelay option ignored in batch queries")
+    }
   }
 
   class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite {
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index d9fad5e..f61696f 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -1908,6 +1908,10 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
     testBadOptions("assign" -> "")("no topicpartitions to assign")
     testBadOptions("subscribe" -> "")("no topics to subscribe")
     testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
+    testBadOptions(
+      "kafka.bootstrap.servers" -> "fake", "subscribe" -> "t", "minOffsetsPerTrigger" -> "20",
+      "maxOffsetsPerTrigger" -> "15")(
+      "value of minOffsetPerTrigger(20) is higher than the maxOffsetsPerTrigger(15)")
   }
 
   test("unsupported kafka configs") {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org