You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/07 11:14:44 UTC

[GitHub] [spark] LuciferYang commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

LuciferYang commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042078520


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   Could the new test case use `checkError()` to check `errorClass` and `parameters` like other places in Spark?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org