You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "micheal-o (via GitHub)" <gi...@apache.org> on 2024/02/22 22:19:41 UTC

[PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

micheal-o opened a new pull request, #45221:
URL: https://github.com/apache/spark/pull/45221

   ### What changes were proposed in this pull request?
   In the kafka connector code, we have several code that throws the java **IllegalStateException** to report data loss, while reading from Kafka. This change is to properly classify those exceptions using the new error framework. New error classes are introduced for Kafka data loss errors. 
   
   
   ### Why are the changes needed?
   New error framework for better error messages
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Updated existing tests
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "micheal-o (via GitHub)" <gi...@apache.org>.
micheal-o commented on PR #45221:
URL: https://github.com/apache/spark/pull/45221#issuecomment-1961897552

   cc @HeartSaVioR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503171103


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
 
     val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        reportDataLoss(
+          s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+          () =>
+            QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+              deletedPartitions.toString,
+              ConsumerConfig.GROUP_ID_CONFIG))
       } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
+        reportDataLoss(
+          s"$deletedPartitions are gone. Some data may have been missed.",
+          () => QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))

Review Comment:
   It took me a bit to understand this logic and this appears more than once.
   
   Could we simplify the logic? I see that there are two functions with similar roles defined: `partitionsDeletedKafkaError` and `partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig: Option[String])`,  and `match` on groupIdConfig, we can set correct error classs and message parameters.
   
   Then we can clean the logic here by doing something like:
   ```
   val message = if () ... else ()...
   reportDataLoss(message, QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString, Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1505349482


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
 
     val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
+      val (message, config) =
+        if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+          (s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",

Review Comment:
   nit: space between . and $



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala:
##########
@@ -444,30 +447,40 @@ private[kafka010] class KafkaOffsetReaderAdmin(
   override def getOffsetRangesFromResolvedOffsets(
       fromPartitionOffsets: PartitionOffsetMap,
       untilPartitionOffsets: PartitionOffsetMap,
-      reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
+      reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
     // Find the new partitions, and get their earliest offsets
     val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
     val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
     if (newPartitionInitialOffsets.keySet != newPartitions) {
       // We cannot get from offsets for some partitions. It means they got deleted.
       val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
       reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
+        () =>
+          QueryExecutionErrors.initialOffsetNotFoundForPartitionsKafkaError(
+            deletedPartitions.toString))
     }
     logInfo(s"Partitions added: $newPartitionInitialOffsets")
     newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
       reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+        s"Added partition $p starts from $o instead of 0. Some data may have been missed",
+        () => QueryExecutionErrors.addedPartitionDoesNotStartFromZeroKafkaError(p.toString, o))
     }
 
     val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
+      val (message, config) =
+        if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+          (s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",

Review Comment:
   nit: space between . and $



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503171103


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
 
     val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        reportDataLoss(
+          s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+          () =>
+            QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+              deletedPartitions.toString,
+              ConsumerConfig.GROUP_ID_CONFIG))
       } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
+        reportDataLoss(
+          s"$deletedPartitions are gone. Some data may have been missed.",
+          () => QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))

Review Comment:
   It took me a bit to understand this logic and this appears more than once.
   
   Could we simplify the logic? I see that there are two functions with similar roles defined: `partitionsDeletedKafkaError` and `partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig: Option[String])`,  and `match` on groupIdConfig, we can set correct error classs and message parameters.
   
   Then here we can clean the logic here by doing something like:
   ```
   val message = if () ... else ()...
   reportDataLoss(message, QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString, Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503171103


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
 
     val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        reportDataLoss(
+          s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+          () =>
+            QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+              deletedPartitions.toString,
+              ConsumerConfig.GROUP_ID_CONFIG))
       } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
+        reportDataLoss(
+          s"$deletedPartitions are gone. Some data may have been missed.",
+          () => QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))

Review Comment:
   It took me a bit to understand this logic and this appears more than once.
   
   Could we simplify the logic? I see that there are two functions with similar objectives defined: `partitionsDeletedKafkaError` and `partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig: Option[String])`,  and `match` on groupIdConfig, we can set correct error classs and message parameters.
   
   Then we can clean the logic here by doing something like:
   ```
   val message = if () ... else ()...
   reportDataLoss(message, QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString, Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503422041


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream(
   }
 
   /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * If `failOnDataLoss` is true, this method will throw the exception.
    * Otherwise, just log a warning.
    */
-  private def reportDataLoss(message: String): Unit = {
+  private def reportDataLoss(message: String, getException: () => Throwable): Unit = {

Review Comment:
   oh I see, I wasn't aware of that : ) Thanks for the pointer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "micheal-o (via GitHub)" <gi...@apache.org>.
micheal-o commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503219354


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream(
   }
 
   /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * If `failOnDataLoss` is true, this method will throw the exception.
    * Otherwise, just log a warning.
    */
-  private def reportDataLoss(message: String): Unit = {
+  private def reportDataLoss(message: String, getException: () => Throwable): Unit = {

Review Comment:
   This is scala's `call by name`. I was going to do this, but saw that our style guide asks us to avoid using it.
   
   > Avoid using call by name. Use () => T explicitly.
   
   https://github.com/databricks/scala-style-guide?tab=readme-ov-file#call-by-name



##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -2622,6 +2622,62 @@
     ],
     "sqlState" : "42K0E"
   },
+  "KAFKA_DATA_LOSS" : {
+    "message" : [
+      "Some data may have been lost because they are not available in Kafka any more;",
+      "either the data was aged out by Kafka or the topic may have been deleted before all the data in the",
+      "topic was processed.",
+      "If you don't want your streaming query to fail on such cases, set the source option failOnDataLoss to false.",
+      "Reason:"

Review Comment:
   Yeah, I copied the message from here and was going to remove it. Forgot. Will remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "micheal-o (via GitHub)" <gi...@apache.org>.
micheal-o commented on PR #45221:
URL: https://github.com/apache/spark/pull/45221#issuecomment-1960619455

   cc @MaxGekk @srielau - I added new error class


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "micheal-o (via GitHub)" <gi...@apache.org>.
micheal-o commented on PR #45221:
URL: https://github.com/apache/spark/pull/45221#issuecomment-1965540824

   @WweiL Good comments. Thanks for catching them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503171103


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
 
     val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        reportDataLoss(
+          s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+          () =>
+            QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+              deletedPartitions.toString,
+              ConsumerConfig.GROUP_ID_CONFIG))
       } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
+        reportDataLoss(
+          s"$deletedPartitions are gone. Some data may have been missed.",
+          () => QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))

Review Comment:
   It took me a bit to understand this logic and this appears more than once.
   
   Could we simplify the logic? I see that there are two functions with similar roles defined: `partitionsDeletedKafkaError` and `partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig: Option[String])`,  and matching on groupIdConfig, we can set correct error classs and message parameters.
   
   Then here we can clean the logic here by doing something like:
   ```
   val message = if () ... else ()...
   reportDataLoss(message, QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString, Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on PR #45221:
URL: https://github.com/apache/spark/pull/45221#issuecomment-1960954082

   ci failures seems related: https://github.com/micheal-o/spark/actions/runs/8013249769/job/21890794902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1500072399


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala:
##########
@@ -624,31 +627,48 @@ private[kafka010] class KafkaDataConsumer(
   /**
    * Return an addition message including useful message and instruction.
    */
-  private def additionalMessage(
+  private def additionalWarningMessage(
       topicPartition: TopicPartition,
-      groupId: String,
-      failOnDataLoss: Boolean): String = {
-    if (failOnDataLoss) {
-      s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
-        s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE"
-    } else {
-      s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
-        s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
-    }
+      groupId: String): String = {
+    s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
+      s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
   }
 
   /**
-   * Throw an exception or log a warning as per `failOnDataLoss`.
+   * Throw an exception when Data loss is detected.

Review Comment:
   nit: data loss



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503124502


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -2622,6 +2622,62 @@
     ],
     "sqlState" : "42K0E"
   },
+  "KAFKA_DATA_LOSS" : {
+    "message" : [
+      "Some data may have been lost because they are not available in Kafka any more;",
+      "either the data was aged out by Kafka or the topic may have been deleted before all the data in the",
+      "topic was processed.",
+      "If you don't want your streaming query to fail on such cases, set the source option failOnDataLoss to false.",
+      "Reason:"

Review Comment:
   I think this replaces `INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE` defined in `KafkaSourceProvider`. Do we want to remove it, or is this like a first step and there are other error classes depends on this one? 



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -904,7 +904,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with
         testUtils.sendMessages(topic2, Array("6"))
       },
       StartStream(),
-      ExpectFailure[IllegalStateException](e => {
+      ExpectFailure[SparkException](e => {

Review Comment:
   I'm hesitate changing the exception to be SparkException here for backward compatibility.
   
   Existing code might be expecting IllegalStateException and do handling. Can we define a class that extends IllegalStateException instead of using SparkException?



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
 
     val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        reportDataLoss(
+          s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+          () =>
+            QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+              deletedPartitions.toString,
+              ConsumerConfig.GROUP_ID_CONFIG))
       } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
+        reportDataLoss(
+          s"$deletedPartitions are gone. Some data may have been missed.",
+          () => QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))

Review Comment:
   It took me a bit to understand this logic and this appears more than once.
   
   Could we simplify the logic? I see that there are two function with similar roles defined: `partitionsDeletedKafkaError` and `partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig: Option[String])`,  and matching on groupIdConfig, we can set correct error classs and message parameters.
   
   Then here we can clean the logic here by doing something like:
   ```
   val message = if () ... else ()...
   reportDataLoss(message, QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString, Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream(
   }
 
   /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * If `failOnDataLoss` is true, this method will throw the exception.
    * Otherwise, just log a warning.
    */
-  private def reportDataLoss(message: String): Unit = {
+  private def reportDataLoss(message: String, getException: () => Throwable): Unit = {

Review Comment:
   I think the reason why you are deining `getException` to be `() => Throwable` is to avoid unneeded computation of `getException()` in the `else` branch?
   
   In that case I think this code would also work:
   
   ```
   private def reportDataLoss(message: String, getException:  => Throwable): Unit = {
     if (failOnDataLoss) {
         throw getException
       } else {
         logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
       }
   ```
   Some examples in the code base
   https://github.com/apache/spark/blob/b7763a7eae2b9609012cbb4ce981276c6471cc4e/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala#L59-L61



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45221: [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions
URL: https://github.com/apache/spark/pull/45221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45221:
URL: https://github.com/apache/spark/pull/45221#issuecomment-1970158777

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503124502


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -2622,6 +2622,62 @@
     ],
     "sqlState" : "42K0E"
   },
+  "KAFKA_DATA_LOSS" : {
+    "message" : [
+      "Some data may have been lost because they are not available in Kafka any more;",
+      "either the data was aged out by Kafka or the topic may have been deleted before all the data in the",
+      "topic was processed.",
+      "If you don't want your streaming query to fail on such cases, set the source option failOnDataLoss to false.",
+      "Reason:"

Review Comment:
   I think this replaces `INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE` defined in `KafkaSourceProvider`. Do we want to remove it, or is this like a first step and there are other error classes depending on this one? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "WweiL (via GitHub)" <gi...@apache.org>.
WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1500072188


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2715,4 +2715,70 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
         "numFields" -> numFields.toString,
         "schemaLen" -> schemaLen.toString))
   }
+
+  def couldNotReadOffsetRangeInKafkaError(startOffset: Long,
+                                          endOffset: Long,

Review Comment:
   nit indentation : )
   `For method declarations, use 4 space indentation ` from https://github.com/databricks/scala-style-guide



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "micheal-o (via GitHub)" <gi...@apache.org>.
micheal-o commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1500078362


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2715,4 +2715,70 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
         "numFields" -> numFields.toString,
         "schemaLen" -> schemaLen.toString))
   }
+
+  def couldNotReadOffsetRangeInKafkaError(startOffset: Long,
+                                          endOffset: Long,

Review Comment:
   Yeah, i need to fix it every where. IntelliJ messed it up. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions [spark]

Posted by "micheal-o (via GitHub)" <gi...@apache.org>.
micheal-o commented on PR #45221:
URL: https://github.com/apache/spark/pull/45221#issuecomment-1961832330

   Fixing SparkThrowableSuite failure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org