You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/05 07:33:12 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request, #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

HeartSaVioR opened a new pull request, #38911:
URL: https://github.com/apache/spark/pull/38911

   ### What changes were proposed in this pull request?
   
   This PR proposes to add defensive assertions to Kafka data source for Trigger.AvailableNow, so that the query will rather fail fast instead of infinite run if there will be a bug in Kafka data source.
   
   ### Why are the changes needed?
   
   This change will play as a safeguard that the future bug in Kafka data source won't lead to infinite query run.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041866576


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+    "message" : [
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ",
+      "topic-partitions for pre-fetched offset: <tpsForPrefetched>, topic-partitions for end offset: <tpsForEndOffset>."
+    ]
+  },
+  "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : {
+    "message" : [
+      "For Kafka data source with Trigger.AvailableNow, end offset should have lower or equal offset per each topic partition than pre-fetched offset.",
+      "pre-fetched offset: <prefetchedOffset>, end offset: <endOffset>."
+    ]
+  },
+  "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : {
+    "message" : [
+      "Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. Make sure topic partitions are not dropped during the query run.",

Review Comment:
   Yup, maybe it's not a trivial case that users drop (and recreate) a topic which is being read from the running query. Better guidance may be just saying restart your query and it will work well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041779007


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = {
+      allDataForTriggerAvailableNow.keySet.forall { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset <= offsetFromPrefetched
+      }
+    }
+    assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched,

Review Comment:
   I just dealt with error class framework (separate one for Kafka data source) as well as making errors to non-internal (user-facing).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1047895867


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(

Review Comment:
   Yeah my initial intention was to guard against the code bug, but now it's more than the intention. Good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042916237


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {

Review Comment:
   I just went with MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED although it is still lengthy. Probably I can't change this till I understand what is the purpose and what's preference, via getting answers from above two questions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042805980


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   Stand on the 3rd party developer and go with journey how they can use the classes to integrate their data source to the error class framework. I know there is a README file but I don't think it's friendly to 3rd party developer. 
   
   https://github.com/apache/spark/tree/master/core/src/main/resources/error/README.md
   
   1. They have to define their own error-class.json as the file is not modificable.
   2. We guide them to leverage SparkThrowable but default implementation is tied to Spark's error class json so they will be surprised that it just doesn't work and they have to override every default implementation. No documentation for this.
   3. They may understand what error class intends to do, but if they have to go with uncategorized error then they have no idea how they can ensure picking up unused sequence number.
   4. I believe classifying internal vs user-facing error is one of key points for UX of error class framework, but there is no mention in the README. Actually someone would have no idea how error class framework will show the exception to the end users. If they expect the same, they will be surprised for how we handle internal errors separately.
   
   This error class framework guideline is not mentioned anywhere in the data source implementation doc(we don't have one actually)/code comment.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041856979


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   Why do we need to fetch the latest offset from Kafka again?  This will add additional latency.  I know its trigger available now but what does this buy us?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1046683120


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   > If failOnDataLoss is turned on, we tolerate the removal of topic partition, and allow offset to go "backward".
   
   You mean 'if not turned on'? If we tolerate with 'processing time' trigger, any reason why we shouldn't tolerate here (and avoid waiting forever). 
   E.g. we could reset the offsets.
   I am just thinking aloud, we don't need to block on this. 



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(

Review Comment:
   Minor: `verifyEndOffsetFor..` might be better name than `assertEndOffset..`. `assert` has a specific meaning (often a code bug). This functionality sounds like more like verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041067010


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala:
##########
@@ -349,6 +354,54 @@ private[kafka010] class KafkaSource(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(

Review Comment:
   In the end we want to remove DSv1 implementation of Kafka data source. (There is a semantic issue on batch side.) Till that time, I'd like to duplicate code, like we have been doing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38911:
URL: https://github.com/apache/spark/pull/38911#issuecomment-1347663347

   cc. @zsxwing @xuanyuanking @viirya Friendly reminder.
   cc. @rangadi @LuciferYang @MaxGekk @srielau @jerrypeng Would you mind taking another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38911:
URL: https://github.com/apache/spark/pull/38911#issuecomment-1342007657

   cc. @zsxwing @xuanyuanking @viirya Friendly reminder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   It's not safe to just pretend that Kafka resource won't change during the query run. If it happens, it will end up with unexpected behavior e.g. consumer based grouping, polling from non-exist topic partition may lead to timeout of metadata fetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042839180


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {

Review Comment:
   I admit I just wrote a sentence with underbar. But would it work if I use abbr of the expression, e.g. TPS as topic partitions to make it shorter? What should we take between shorter vs clearer?
   
   `MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED` this is still very long. We can't discard the part of mentioning two targets because we have similar error type with different targets of comparison.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38911:
URL: https://github.com/apache/spark/pull/38911#issuecomment-1336865300

   cc. @zsxwing @viirya @jerrypeng Please take a look. Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic / partition is dropped, leading that it never reaches the end state (prepared offset).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041848355


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+    "message" : [
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ",

Review Comment:
   This should be a transient issue right?  As in when the job is triggered again, the job should run fine right and number the job will read from whichever partitions exist?  Can you add that to this error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic partition is dropped, leading that it never reaches the end state (prepared offset). Same for topic recreation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1047895383


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   Yeah sorry I meant if it's not turned on (turned off).
   
   > If we tolerate with 'processing time' trigger, any reason why we shouldn't tolerate here (and avoid waiting forever).
   > E.g. we could reset the offsets.
   
   Source may be able to do the smart thing, e.g. end offset being built by preparation can change based on the change of latest offset, but it would be also tricky. Maybe someone has bright idea to rebuild the end offset with consideration of failOnDataLoss, this would be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
srielau commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042787247


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {

Review Comment:
   Still a mouthful..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] MaxGekk commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
MaxGekk commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041866109


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {

Review Comment:
   Since this could appear in headers of paragraphs in docs, could you make it shorter if it is possible. Also cc @srielau 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042164208


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   New exceptions don't define a new error class. I don't feel like we have constructed a best practice to apply error class framework to "Data sources", especially if it's 3rd party one. (Pretty sure Kafka is built-in, but this is also a reference implementation for 3rd party developers.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042793780


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   These things can happen regardless of whether availableNow trigger is used right?  What happens if these things happen when using processing time trigger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38911:
URL: https://github.com/apache/spark/pull/38911#issuecomment-1354290980

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040618350


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   Can we do this check regardless of whether we are using available now trigger?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040675168


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   And this never fails with current implementation of Trigger.AvailableNow (once it figures out the final end offset, it never looks at the available offset range in real topic) hence it's not feasible to create a test for this. This is to identify future bugs quickly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042172068


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   E.g. The default implementation of SparkThrowable is tightly coupled with SparkThrowableHelper which error class reader is tied to the Spark project global one. If 3rd party data source developer decides to (and technically has to) go with different error class json file then the default implementation no longer works. Maybe there is a room for improvement, make utility class/object be extensible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042164208


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   New exceptions don't define a new error class. I don't feel like we have constructed the best practice to apply error class framework to "Data sources", especially if it's 3rd party one. (Pretty sure Kafka is built-in, but this is also a reference implementation for 3rd party developers.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042860224


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   The thing is, when the query is running with AvailableNow, there is an "end" condition, and if the offset is off against the direction to reach an end condition, the query will run infinitely which is definitely not what users are desired. Huge difference with other triggers. And also, in other triggers, they fetch the latest information in each batch already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If we don't fetch the latest offset with latest topic-partitions from Kafka again, what we are trying to guard against? If someone turns on failOnDataLoss and runs the query with Trigger.AvailableNow, it will just move on when specific topic partition is dropped, leading that it never reaches the end state (prepared offset).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041856552


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+    "message" : [
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ",

Review Comment:
   We are guarding against arbitrary bugs so there is no 100% guarantee that this would only happen transiently, but I agree that restarting the query would mitigate the issue in most cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042858825


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -194,6 +194,10 @@ private[kafka010] class KafkaMicroBatchStream(
     val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     val endPartitionOffsets = end.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 
+    if (allDataForTriggerAvailableNow != null) {

Review Comment:
   This is effectively done via two assertions (topic partitions, offsets) in assertEndOffsetForTriggerAvailableNow.
   
   Do you mean by having sanity check for non Trigger.AvailableNow case? KafkaOffsetReader.getOffsetRangesFromResolvedOffsets handles it, and we have to lean on this because setting failOnDataLoss to false allows end topic offsets is smaller than start topic offsets.
   
   (I would argue it is scary how many fault scenarios the option simply tolerates (ignores) but it's not something we can make changes.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow
URL: https://github.com/apache/spark/pull/38911


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041848809


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {
+    "message" : [
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in pre-fetched offset to end offset for each microbatch. ",
+      "topic-partitions for pre-fetched offset: <tpsForPrefetched>, topic-partitions for end offset: <tpsForEndOffset>."
+    ]
+  },
+  "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED" : {
+    "message" : [
+      "For Kafka data source with Trigger.AvailableNow, end offset should have lower or equal offset per each topic partition than pre-fetched offset.",
+      "pre-fetched offset: <prefetchedOffset>, end offset: <endOffset>."
+    ]
+  },
+  "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW" : {
+    "message" : [
+      "Some of partitions in Kafka topic(s) have been lost during running query with Trigger.AvailableNow. Make sure topic partitions are not dropped during the query run.",

Review Comment:
   Same here.  Transient error right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srielau commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
srielau commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042787817


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   Improvement is good. What can we do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042795449


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If failOnDataLoss is turned on, we tolerate the removal of topic partition, and allow offset to go "backward".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042860224


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   The thing is, when the query is running with AvailableNow, there is an "end" condition, and if the offset is off against the direction to reach an end condition, the query will run infinitely. Huge difference with other triggers. And also, in other triggers, they fetch the latest information in each batch already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042892403


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   Anyway the necessity for improvement of error class framework should not be scoped to this PR. We could leverage Kafka data source to dogfood for 3rd party data source implementation, but it needs to be different effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040641431


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   The check is also done in KafkaOffsetReader.getOffsetRangesFromResolvedOffsets, but we do not fail the query when `failondataloss` is set to `true`. We do this specifically for Trigger.AvailableNow since we want to let the query fail even though users specify the option `failondataloss` to `true`.



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   The check is also done in `KafkaOffsetReader.getOffsetRangesFromResolvedOffsets`, but we do not fail the query when `failondataloss` is set to `true`. We do this specifically for Trigger.AvailableNow since we want to let the query fail even though users specify the option `failondataloss` to `true`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38911:
URL: https://github.com/apache/spark/pull/38911#issuecomment-1339489629

   I just made a change to do some "actual" assertion, via fetching the latest information for topic-partitions and their latest offset. Hope it makes more sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Add defensive assertions to Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1040675168


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,26 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    assert(allDataForTriggerAvailableNow.keySet.forall { tp =>

Review Comment:
   And this never fails with current implementation of Trigger.AvailableNow hence it's not feasible to create a test for this. This is to identify future bugs quickly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jerrypeng commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042792623


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -194,6 +194,10 @@ private[kafka010] class KafkaMicroBatchStream(
     val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     val endPartitionOffsets = end.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 
+    if (allDataForTriggerAvailableNow != null) {

Review Comment:
   Can we do a sanity check i.e. the end topic offsets are larger than start topic offsets?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042805980


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   Stand on the 3rd party developer and go with journey how they can use the classes to integrate their data source to the error class framework. I know there is a README file but I don't think it's friendly to 3rd party developer. 
   
   https://github.com/apache/spark/tree/master/core/src/main/resources/error/README.md
   
   1. They have to define their own error-class.json as the file is not modificable.
   2. We guide them to leverage SparkThrowable but default implementation is tied to Spark's error class json so they will be surprised that it just doesn't work and they have to override every default implementation. No documentation for this.
   3. They may understand what error class intends to do, but if they have to go with uncategorized error then they have no idea how they can ensure picking up unused sequence number.
   4. I believe classifying internal vs user-facing error is one of key points for UX of error class framework, but there is no mention in the README. Actually someone would have no idea how error class framework will show the exception to the end users. If they expect the same, they will be surprised for how we handle internal errors separately.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
rangadi commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041594443


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = {
+      allDataForTriggerAvailableNow.keySet.forall { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset <= offsetFromPrefetched
+      }
+    }
+    assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched,

Review Comment:
   Is assert a good way to pass useful error message to the user?
   Other parts throw compilation or executor errors. 
   E.g. https://github.com/apache/spark/blob/master/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala#L73
   
   I think assert is only for cases that are not expected at runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041651140


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,54 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+    assert(tpsForPrefetched == tpsForEndOffset,
+      "Kafka data source in Trigger.AvailableNow should provide the same topic partitions in " +
+        "pre-fetched offset to end offset for each microbatch. " +
+        s"topic-partitions for pre-fetched offset: $tpsForPrefetched, " +
+        s"topic-partitions for end offset: $tpsForEndOffset.")
+
+    val endOffsetHasGreaterOrEqualOffsetComparedToPrefetched = {
+      allDataForTriggerAvailableNow.keySet.forall { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset <= offsetFromPrefetched
+      }
+    }
+    assert(endOffsetHasGreaterOrEqualOffsetComparedToPrefetched,

Review Comment:
   Yeah... it's a bit tricky. The initial rationalization of the assertion "was" to point out the bug quickly and let the streaming query fail fast rather than running infinitely. But it is also the user who can make any arbitrary change against topic partition externally during the run of Trigger.AvailableNow and mess the query.
   
   So we have actually two different audiences. If we consider the cases only for the possible bugs we would need to leave this as it is, so that this is considered as "INTERNAL ERROR". (Not sure we have to go with error framework for this case as well. Maybe @MaxGekk ?) If not, we should probably change the error as leveraging error framework and not mark this as internal error.
   
   My feeling is that it'd be rare for users to modify the topic during the query run so it still makes sense to target to internal first, but I'm OK either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041864100


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   It's not safe to just pretend that Kafka resource won't change during the query run. If it happens, it will end up with unexpected behavior e.g. with consumer based grouping, polling from non-exist topic partition may lead to timeout of metadata fetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041879197


##########
connector/kafka-0-10-sql/src/test/resources/error/kafka-error-classes.json:
##########
@@ -0,0 +1,26 @@
+{
+  "TOPIC_PARTITIONS_IN_END_OFFSET_ARE_NOT_SAME_WITH_PREFETCHED" : {

Review Comment:
   I'm not sure about the doc since this is not a part of Spark's centralized error class. I intended to separate this one for Kafka data source. Kafka data source is considered as a reference implementation hence I wanted to see how 3rd party can integrate Spark's error class framework.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #38911: [SPARK-41387][SS] Assert current end offset from Kafka data source for Trigger.AvailableNow

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042078520


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " +

Review Comment:
   Could the new test case use `checkError()` to check `errorClass` and `parameters` like other places in Spark?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org