You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2024/02/29 00:27:04 UTC

(spark) branch master updated: [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 36df0a63a139 [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions
36df0a63a139 is described below

commit 36df0a63a139704ccd2a344d057e430430b11ad8
Author: micheal-o <mi...@gmail.com>
AuthorDate: Thu Feb 29 09:26:47 2024 +0900

    [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions
    
    ### What changes were proposed in this pull request?
    In the kafka connector code, we have several code that throws the java **IllegalStateException** to report data loss, while reading from Kafka. This change is to properly classify those exceptions using the new error framework. Adds a new exception type `SparkIllegalStateException` that can receive error class. New error classes are introduced for Kafka data loss errors.
    
    ### Why are the changes needed?
    New error framework for better error messages
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, better error message with error class
    
    ### How was this patch tested?
    Updated existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45221 from micheal-o/bmo/IllegalStateEx.
    
    Lead-authored-by: micheal-o <mi...@gmail.com>
    Co-authored-by: micheal-o <mi...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../main/resources/error/kafka-error-classes.json  |  56 +++++++++++
 .../spark/sql/kafka010/KafkaContinuousStream.scala |  26 ++---
 .../spark/sql/kafka010/KafkaExceptions.scala       | 109 +++++++++++++++++++--
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |   6 +-
 .../spark/sql/kafka010/KafkaOffsetReader.scala     |   4 +-
 .../sql/kafka010/KafkaOffsetReaderAdmin.scala      |  42 +++++---
 .../sql/kafka010/KafkaOffsetReaderConsumer.scala   |  46 ++++++---
 .../apache/spark/sql/kafka010/KafkaSource.scala    |   6 +-
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |   8 --
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  |  85 ++++++++--------
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |   2 +-
 .../sql/kafka010/KafkaOffsetReaderSuite.scala      |   2 +-
 .../kafka010/consumer/KafkaDataConsumerSuite.scala |  11 ++-
 13 files changed, 290 insertions(+), 113 deletions(-)

diff --git a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json
index ea7ffb592a55..a7b22e1370fd 100644
--- a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json
+++ b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json
@@ -22,5 +22,61 @@
       "Some of partitions in Kafka topic(s) report available offset which is less than end offset during running query with Trigger.AvailableNow. The error could be transient - restart your query, and report if you still see the same issue.",
       "latest offset: <latestOffset>, end offset: <endOffset>"
     ]
+  },
+  "KAFKA_DATA_LOSS" : {
+    "message" : [
+      "Some data may have been lost because they are not available in Kafka any more;",
+      "either the data was aged out by Kafka or the topic may have been deleted before all the data in the",
+      "topic was processed.",
+      "If you don't want your streaming query to fail on such cases, set the source option failOnDataLoss to false.",
+      "Reason:"
+    ],
+    "subClass" : {
+      "ADDED_PARTITION_DOES_NOT_START_FROM_OFFSET_ZERO" : {
+        "message" : [
+          "Added partition <topicPartition> starts from <startOffset> instead of 0."
+        ]
+      },
+      "COULD_NOT_READ_OFFSET_RANGE" : {
+        "message" : [
+          "Could not read records in offset [<startOffset>, <endOffset>) for topic partition <topicPartition>",
+          "with consumer group <groupId>."
+        ]
+      },
+      "INITIAL_OFFSET_NOT_FOUND_FOR_PARTITIONS" : {
+        "message" : [
+          "Cannot find initial offsets for partitions <partitions>. They may have been deleted."
+        ]
+      },
+      "PARTITIONS_DELETED" : {
+        "message" : [
+          "Partitions <partitions> have been deleted."
+        ]
+      },
+      "PARTITIONS_DELETED_AND_GROUP_ID_CONFIG_PRESENT" : {
+        "message" : [
+          "Partitions <partitions> have been deleted.",
+          "Kafka option 'kafka.<groupIdConfig>' has been set on this query, it is",
+          "not recommended to set this option. This option is unsafe to use since multiple concurrent",
+          "queries or sources using the same group id will interfere with each other as they are part",
+          "of the same consumer group. Restarted queries may also suffer interference from the",
+          "previous run having the same group id. The user should have only one query per group id,",
+          "and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka",
+          "consumers from the previous query are marked dead by the Kafka group coordinator before the",
+          "restarted query starts running."
+        ]
+      },
+      "PARTITION_OFFSET_CHANGED" : {
+        "message" : [
+          "Partition <topicPartition> offset was changed from <prevOffset> to <newOffset>."
+        ]
+      },
+      "START_OFFSET_RESET" : {
+        "message" : [
+          "Starting offset for <topicPartition> was <offset> but consumer reset to <fetchedOffset>."
+        ]
+      }
+    },
+    "sqlState" : "22000"
   }
 }
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index a86acd971a1c..9b7f52585545 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -92,13 +92,17 @@ class KafkaContinuousStream(
 
     val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
+      val (message, config) =
+        if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+          (s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+            Some(ConsumerConfig.GROUP_ID_CONFIG))
+        } else {
+          (s"$deletedPartitions are gone. Some data may have been missed.", None)
+        }
+
+      reportDataLoss(
+        message,
+        () => KafkaExceptions.partitionsDeleted(deletedPartitions, config))
     }
 
     val startOffsets = newPartitionOffsets ++
@@ -137,12 +141,12 @@ class KafkaContinuousStream(
   override def toString(): String = s"KafkaSource[$offsetReader]"
 
   /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * If `failOnDataLoss` is true, this method will throw the exception.
    * Otherwise, just log a warning.
    */
-  private def reportDataLoss(message: String): Unit = {
+  private def reportDataLoss(message: String, getException: () => Throwable): Unit = {
     if (failOnDataLoss) {
-      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+      throw getException()
     } else {
       logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
     }
@@ -221,7 +225,7 @@ class KafkaContinuousPartitionReader(
 
         // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
         // or if it's the endpoint of the data range (i.e. the "true" next offset).
-        case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+        case e: KafkaIllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
           val range = consumer.getAvailableOffsetRange()
           if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
             // retry
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
index b0e30f37af51..300d288507a3 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
@@ -17,19 +17,23 @@
 
 package org.apache.spark.sql.kafka010
 
+import scala.jdk.CollectionConverters._
+
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{ErrorClassesJsonReader, SparkException}
+import org.apache.spark.{ErrorClassesJsonReader, SparkException, SparkThrowable}
 
-object KafkaExceptions {
-  private val errorClassesJsonReader: ErrorClassesJsonReader =
+private object KafkaExceptionsHelper {
+  val errorClassesJsonReader: ErrorClassesJsonReader =
     new ErrorClassesJsonReader(
       Seq(getClass.getClassLoader.getResource("error/kafka-error-classes.json")))
+}
 
+object KafkaExceptions {
   def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
       tpsForPrefetched: Set[TopicPartition],
       tpsForEndOffset: Set[TopicPartition]): SparkException = {
-    val errMsg = errorClassesJsonReader.getErrorMessage(
+    val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
       "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED",
       Map(
         "tpsForPrefetched" -> tpsForPrefetched.toString(),
@@ -42,7 +46,7 @@ object KafkaExceptions {
   def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
       prefetchedOffset: Map[TopicPartition, Long],
       endOffset: Map[TopicPartition, Long]): SparkException = {
-    val errMsg = errorClassesJsonReader.getErrorMessage(
+    val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
       "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED",
       Map(
         "prefetchedOffset" -> prefetchedOffset.toString(),
@@ -55,7 +59,7 @@ object KafkaExceptions {
   def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
       tpsForLatestOffset: Set[TopicPartition],
       tpsForEndOffset: Set[TopicPartition]): SparkException = {
-    val errMsg = errorClassesJsonReader.getErrorMessage(
+    val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
       "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW",
       Map(
         "tpsForLatestOffset" -> tpsForLatestOffset.toString(),
@@ -68,7 +72,7 @@ object KafkaExceptions {
   def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
       latestOffset: Map[TopicPartition, Long],
       endOffset: Map[TopicPartition, Long]): SparkException = {
-    val errMsg = errorClassesJsonReader.getErrorMessage(
+    val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
       "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_LATEST_WITH_TRIGGER_AVAILABLENOW",
       Map(
         "latestOffset" -> latestOffset.toString(),
@@ -77,4 +81,95 @@ object KafkaExceptions {
     )
     new SparkException(errMsg)
   }
+
+  def couldNotReadOffsetRange(
+      startOffset: Long,
+      endOffset: Long,
+      topicPartition: TopicPartition,
+      groupId: String,
+      cause: Throwable): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "KAFKA_DATA_LOSS.COULD_NOT_READ_OFFSET_RANGE",
+      messageParameters = Map(
+        "startOffset" -> startOffset.toString,
+        "endOffset" -> endOffset.toString,
+        "topicPartition" -> topicPartition.toString,
+        "groupId" -> groupId),
+      cause = cause)
+  }
+
+  def startOffsetReset(
+      topicPartition: TopicPartition,
+      offset: Long,
+      fetchedOffset: Long): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET",
+      messageParameters = Map(
+        "topicPartition" -> topicPartition.toString,
+        "offset" -> offset.toString,
+        "fetchedOffset" -> fetchedOffset.toString))
+  }
+
+  def initialOffsetNotFoundForPartitions(
+      partitions: Set[TopicPartition]): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "KAFKA_DATA_LOSS.INITIAL_OFFSET_NOT_FOUND_FOR_PARTITIONS",
+      messageParameters = Map("partitions" -> partitions.toString))
+  }
+
+  def addedPartitionDoesNotStartFromZero(
+      topicPartition: TopicPartition,
+      startOffset: Long): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "KAFKA_DATA_LOSS.ADDED_PARTITION_DOES_NOT_START_FROM_OFFSET_ZERO",
+      messageParameters =
+        Map("topicPartition" -> topicPartition.toString, "startOffset" -> startOffset.toString))
+  }
+
+  def partitionsDeleted(
+      partitions: Set[TopicPartition],
+      groupIdConfigName: Option[String]): KafkaIllegalStateException = {
+    groupIdConfigName match {
+      case Some(config) =>
+        new KafkaIllegalStateException(
+          errorClass = "KAFKA_DATA_LOSS.PARTITIONS_DELETED_AND_GROUP_ID_CONFIG_PRESENT",
+          messageParameters = Map("partitions" -> partitions.toString, "groupIdConfig" -> config))
+      case None =>
+        new KafkaIllegalStateException(
+          errorClass = "KAFKA_DATA_LOSS.PARTITIONS_DELETED",
+          messageParameters = Map("partitions" -> partitions.toString))
+    }
+  }
+
+  def partitionOffsetChanged(
+      topicPartition: TopicPartition,
+      prevOffset: Long,
+      newOffset: Long): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "KAFKA_DATA_LOSS.PARTITION_OFFSET_CHANGED",
+      messageParameters = Map(
+        "topicPartition" -> topicPartition.toString,
+        "prevOffset" -> prevOffset.toString,
+        "newOffset" -> newOffset.toString))
+  }
+}
+
+/**
+ * Illegal state exception thrown with an error class.
+ */
+private[kafka010] class KafkaIllegalStateException(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    cause: Throwable = null)
+  extends IllegalStateException(
+    KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
+      errorClass, messageParameters), cause)
+  with SparkThrowable {
+
+  override def getSqlState: String =
+    KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass)
+
+  override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava
+
+  override def getErrorClass: String = errorClass
 }
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index e92ebecfce08..fefa3efcc353 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream(
   }
 
   /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * If `failOnDataLoss` is true, this method will throw the exception.
    * Otherwise, just log a warning.
    */
-  private def reportDataLoss(message: String): Unit = {
+  private def reportDataLoss(message: String, getException: () => Throwable): Unit = {
     if (failOnDataLoss) {
-      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+      throw getException()
     } else {
       logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
     }
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index b1370e10501f..df0c7e9c0425 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -62,7 +62,7 @@ private[kafka010] trait KafkaOffsetReader {
    */
   def fetchSpecificOffsets(
       partitionOffsets: Map[TopicPartition, Long],
-      reportDataLoss: String => Unit): KafkaSourceOffset
+      reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset
 
   /**
    * Resolves the specific offsets based on timestamp per topic-partition.
@@ -147,7 +147,7 @@ private[kafka010] trait KafkaOffsetReader {
   def getOffsetRangesFromResolvedOffsets(
       fromPartitionOffsets: PartitionOffsetMap,
       untilPartitionOffsets: PartitionOffsetMap,
-      reportDataLoss: String => Unit): Seq[KafkaOffsetRange]
+      reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange]
 }
 
 private[kafka010] object KafkaOffsetReader extends Logging {
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index 9206dfe9b3f2..27adccf6f902 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -148,7 +148,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
 
   override def fetchSpecificOffsets(
       partitionOffsets: Map[TopicPartition, Long],
-      reportDataLoss: String => Unit): KafkaSourceOffset = {
+      reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset = {
     val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
       assert(partitions.asScala == partitionOffsets.keySet,
         "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -404,8 +404,10 @@ private[kafka010] class KafkaOffsetReaderAdmin(
         offsetRangesBase.map(range => (range.topicPartition, range.untilOffset)).toMap
 
       // No need to report data loss here
-      val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => ()).partitionToOffsets
-      val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => ()).partitionToOffsets
+      val resolvedFromOffsets =
+        fetchSpecificOffsets(fromOffsetsMap, (_, _) => ()).partitionToOffsets
+      val resolvedUntilOffsets =
+        fetchSpecificOffsets(untilOffsetsMap, (_, _) => ()).partitionToOffsets
       val ranges = offsetRangesBase.map(_.topicPartition).map { tp =>
         KafkaOffsetRange(tp, resolvedFromOffsets(tp), resolvedUntilOffsets(tp), preferredLoc = None)
       }
@@ -444,7 +446,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
   override def getOffsetRangesFromResolvedOffsets(
       fromPartitionOffsets: PartitionOffsetMap,
       untilPartitionOffsets: PartitionOffsetMap,
-      reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
+      reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
     // Find the new partitions, and get their earliest offsets
     val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
     val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
@@ -452,22 +454,31 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       // We cannot get from offsets for some partitions. It means they got deleted.
       val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
       reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
+        () =>
+          KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
     }
     logInfo(s"Partitions added: $newPartitionInitialOffsets")
     newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
       reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+        s"Added partition $p starts from $o instead of 0. Some data may have been missed",
+        () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
     }
 
     val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
+      val (message, config) =
+        if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+          (s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+            Some(ConsumerConfig.GROUP_ID_CONFIG))
+        } else {
+          (s"$deletedPartitions are gone. Some data may have been missed.", None)
+        }
+
+      reportDataLoss(
+        message,
+        () =>
+          KafkaExceptions.partitionsDeleted(deletedPartitions, config))
     }
 
     // Use the until partitions to calculate offset ranges to ignore partitions that have
@@ -484,8 +495,11 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       val fromOffset = fromOffsets(tp)
       val untilOffset = untilOffsets(tp)
       if (untilOffset < fromOffset) {
-        reportDataLoss(s"Partition $tp's offset was changed from " +
-          s"$fromOffset to $untilOffset, some data may have been missed")
+        reportDataLoss(
+          s"Partition $tp's offset was changed from " +
+            s"$fromOffset to $untilOffset, some data may have been missed",
+          () =>
+            KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
       }
       KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
     }
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
index a859cd3d55a9..d4953a4a65e3 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
@@ -170,7 +170,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
 
   override def fetchSpecificOffsets(
       partitionOffsets: Map[TopicPartition, Long],
-      reportDataLoss: String => Unit): KafkaSourceOffset = {
+      reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset = {
     val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
       assert(partitions.asScala == partitionOffsets.keySet,
         "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -189,7 +189,9 @@ private[kafka010] class KafkaOffsetReaderConsumer(
           off != KafkaOffsetRangeLimit.EARLIEST =>
           if (fetched(tp) != off) {
             reportDataLoss(
-              s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
+              s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}",
+              () =>
+                KafkaExceptions.startOffsetReset(tp, off, fetched(tp)))
           }
         case _ =>
         // no real way to check that beginning or end is reasonable
@@ -451,8 +453,10 @@ private[kafka010] class KafkaOffsetReaderConsumer(
         offsetRangesBase.map(range => (range.topicPartition, range.untilOffset)).toMap
 
       // No need to report data loss here
-      val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => ()).partitionToOffsets
-      val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => ()).partitionToOffsets
+      val resolvedFromOffsets =
+        fetchSpecificOffsets(fromOffsetsMap, (_, _) => ()).partitionToOffsets
+      val resolvedUntilOffsets =
+        fetchSpecificOffsets(untilOffsetsMap, (_, _) => ()).partitionToOffsets
       val ranges = offsetRangesBase.map(_.topicPartition).map { tp =>
         KafkaOffsetRange(tp, resolvedFromOffsets(tp), resolvedUntilOffsets(tp), preferredLoc = None)
       }
@@ -491,7 +495,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
   override def getOffsetRangesFromResolvedOffsets(
       fromPartitionOffsets: PartitionOffsetMap,
       untilPartitionOffsets: PartitionOffsetMap,
-      reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
+      reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
     // Find the new partitions, and get their earliest offsets
     val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
     val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
@@ -499,22 +503,31 @@ private[kafka010] class KafkaOffsetReaderConsumer(
       // We cannot get from offsets for some partitions. It means they got deleted.
       val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
       reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
+        () =>
+          KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
     }
     logInfo(s"Partitions added: $newPartitionInitialOffsets")
     newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
       reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+        s"Added partition $p starts from $o instead of 0. Some data may have been missed",
+        () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
     }
 
     val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
-      } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
-      }
-      reportDataLoss(message)
+      val (message, config) =
+        if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+          (s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+            Some(ConsumerConfig.GROUP_ID_CONFIG))
+        } else {
+          (s"$deletedPartitions are gone. Some data may have been missed.", None)
+        }
+
+      reportDataLoss(
+        message,
+        () =>
+          KafkaExceptions.partitionsDeleted(deletedPartitions, config))
     }
 
     // Use the until partitions to calculate offset ranges to ignore partitions that have
@@ -531,8 +544,11 @@ private[kafka010] class KafkaOffsetReaderConsumer(
       val fromOffset = fromOffsets(tp)
       val untilOffset = untilOffsets(tp)
       if (untilOffset < fromOffset) {
-        reportDataLoss(s"Partition $tp's offset was changed from " +
-          s"$fromOffset to $untilOffset, some data may have been missed")
+        reportDataLoss(
+          s"Partition $tp's offset was changed from " +
+            s"$fromOffset to $untilOffset, some data may have been missed",
+          () =>
+            KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
       }
       KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
     }
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 6f6d6319cd6f..83ed7fff23fc 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -344,12 +344,12 @@ private[kafka010] class KafkaSource(
   override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
 
   /**
-   * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+   * If `failOnDataLoss` is true, this method will throw the exception.
    * Otherwise, just log a warning.
    */
-  private def reportDataLoss(message: String): Unit = {
+  private def reportDataLoss(message: String, getException: () => Throwable): Unit = {
     if (failOnDataLoss) {
-      throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+      throw getException()
     } else {
       logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
     }
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 73446eddd25f..a5fc00ff29ff 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -585,14 +585,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
       | option "failOnDataLoss" to "true".
     """.stripMargin
 
-  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
-    """
-      |Some data may have been lost because they are not available in Kafka any more; either the
-      | data was aged out by Kafka or the topic may have been deleted before all the data in the
-      | topic was processed. If you don't want your streaming query to fail on such cases, set the
-      | source option "failOnDataLoss" to "false".
-    """.stripMargin
-
   val CUSTOM_GROUP_ID_ERROR_MESSAGE =
     s"""Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set on this query, it is
        | not recommended to set this option. This option is unsafe to use since multiple concurrent
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 2bd883a3cd51..fbc4a500322e 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil}
+import org.apache.spark.sql.kafka010.KafkaExceptions
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
 import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
@@ -340,8 +341,11 @@ private[kafka010] class KafkaDataConsumer(
           releaseConsumer()
           fetchedData.reset()
 
-          reportDataLoss(topicPartition, groupId, failOnDataLoss,
-            s"Cannot fetch offset $toFetchOffset", e)
+          if (failOnDataLoss) {
+            throwOnDataLoss(toFetchOffset, untilOffset, topicPartition, groupId, e)
+          } else {
+            logOnDataLoss(topicPartition, groupId, s"Cannot fetch offset $toFetchOffset", e)
+          }
 
           val oldToFetchOffsetd = toFetchOffset
           toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset)
@@ -443,7 +447,7 @@ private[kafka010] class KafkaDataConsumer(
       s"""
          |The current available offset range is $range.
          | Offset $offset is out of range, and records in [$offset, $untilOffset) will be
-         | skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}
+         | skipped ${additionalWarningMessage(topicPartition, groupId)}
         """.stripMargin
       logWarning(warningMessage)
       UNKNOWN_OFFSET
@@ -457,7 +461,7 @@ private[kafka010] class KafkaDataConsumer(
       // then we will see `offset` disappears first then appears again. Although the parameters
       // are same, the state in Kafka cluster is changed, so the outer loop won't be endless.
       logWarning(s"Found a disappeared offset $offset. Some data may be lost " +
-        s"${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}")
+        s"${additionalWarningMessage(topicPartition, groupId)}")
       offset
     } else {
       // ------------------------------------------------------------------------------
@@ -468,7 +472,7 @@ private[kafka010] class KafkaDataConsumer(
       s"""
          |The current available offset range is $range.
          | Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be
-         | skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}
+         | skipped ${additionalWarningMessage(topicPartition, groupId)}
         """.stripMargin
       logWarning(warningMessage)
       range.earliest
@@ -535,18 +539,17 @@ private[kafka010] class KafkaDataConsumer(
         }
         // This may happen when some records aged out but their offsets already got verified
         if (failOnDataLoss) {
-          reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = true,
-            s"Cannot fetch records in [$offset, ${record.offset})")
+          throwOnDataLoss(offset, record.offset, consumer.topicPartition, consumer.groupId)
           // Never happen as "reportDataLoss" will throw an exception
           throw new IllegalStateException(
             "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
         } else if (record.offset >= untilOffset) {
-          reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false,
+          logOnDataLoss(consumer.topicPartition, consumer.groupId,
             s"Skip missing records in [$offset, $untilOffset)")
           // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
           fetchedRecord.withRecord(null, untilOffset)
         } else {
-          reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false,
+          logOnDataLoss(consumer.topicPartition, consumer.groupId,
             s"Skip missing records in [$offset, ${record.offset})")
           fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
         }
@@ -624,31 +627,47 @@ private[kafka010] class KafkaDataConsumer(
   /**
    * Return an addition message including useful message and instruction.
    */
-  private def additionalMessage(
+  private def additionalWarningMessage(
+      topicPartition: TopicPartition,
+      groupId: String): String = {
+    s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
+      s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
+  }
+
+  /**
+   * Throw an exception when data loss is detected.
+   */
+  private def throwOnDataLoss(
+      startOffset: Long,
+      endOffset: Long,
       topicPartition: TopicPartition,
       groupId: String,
-      failOnDataLoss: Boolean): String = {
-    if (failOnDataLoss) {
-      s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
-        s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE"
-    } else {
-      s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
-        s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
-    }
+      cause: Throwable = null): Unit = {
+    dataLoss += 1
+    throw KafkaExceptions.couldNotReadOffsetRange(
+      startOffset,
+      endOffset,
+      topicPartition,
+      groupId,
+      cause)
   }
 
   /**
-   * Throw an exception or log a warning as per `failOnDataLoss`.
+   * Log a warning when data loss is detected.
    */
-  private def reportDataLoss(
+  private def logOnDataLoss(
       topicPartition: TopicPartition,
       groupId: String,
-      failOnDataLoss: Boolean,
       message: String,
       cause: Throwable = null): Unit = {
-    val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}"
+    val finalMessage = s"$message ${additionalWarningMessage(topicPartition, groupId)}"
+
     dataLoss += 1
-    reportDataLoss0(failOnDataLoss, finalMessage, cause)
+    if (cause != null) {
+      logWarning(finalMessage, cause)
+    } else {
+      logWarning(finalMessage)
+    }
   }
 
   private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
@@ -714,24 +733,4 @@ private[kafka010] object KafkaDataConsumer extends Logging {
 
     new KafkaDataConsumer(topicPartition, kafkaParams, consumerPool, fetchedDataPool)
   }
-
-  private def reportDataLoss0(
-      failOnDataLoss: Boolean,
-      finalMessage: String,
-      cause: Throwable = null): Unit = {
-    if (failOnDataLoss) {
-      if (cause != null) {
-        throw new IllegalStateException(finalMessage, cause)
-      } else {
-        throw new IllegalStateException(finalMessage)
-      }
-    } else {
-      if (cause != null) {
-        logWarning(finalMessage, cause)
-      } else {
-        logWarning(finalMessage)
-      }
-    }
-  }
-
 }
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index fb5e71a1e7b8..9ae6a9290f80 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -904,7 +904,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with
         testUtils.sendMessages(topic2, Array("6"))
       },
       StartStream(),
-      ExpectFailure[IllegalStateException](e => {
+      ExpectFailure[KafkaIllegalStateException](e => {
         // The offset of `topic2` should be changed from 2 to 1
         assert(e.getMessage.contains("was changed from 2 to 1"))
       })
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index 332db5483b83..691e81f02a8c 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -170,7 +170,7 @@ class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with Kafk
     val offsetRanges = reader.getOffsetRangesFromResolvedOffsets(
       fromPartitionOffsets,
       untilPartitionOffsets,
-      _ => {})
+      (_, _) => {})
     assert(offsetRanges.sortBy(_.topicPartition.toString) === Seq(
       KafkaOffsetRange(tp1, 0, 33, None),
       KafkaOffsetRange(tp1, 33, 66, None),
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala
index b6748d0f261e..dc3319499982 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala
@@ -32,7 +32,7 @@ import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.{TaskContext, TaskContextImpl}
 import org.apache.spark.kafka010.KafkaDelegationTokenTest
-import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder}
+import org.apache.spark.sql.kafka010.{KafkaIllegalStateException, KafkaTestUtils, RecordBuilder}
 import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.ResetSystemProperties
@@ -91,11 +91,12 @@ class KafkaDataConsumerSuite
     consumerPool.reset()
   }
 
-  test("SPARK-19886: Report error cause correctly in reportDataLoss") {
+  test("SPARK-19886: Report error cause correctly in throwOnDataLoss") {
     val cause = new Exception("D'oh!")
-    val reportDataLoss = PrivateMethod[Unit](Symbol("reportDataLoss0"))
-    val e = intercept[IllegalStateException] {
-      KafkaDataConsumer.invokePrivate(reportDataLoss(true, "message", cause))
+    val throwOnDataLoss = PrivateMethod[Unit](Symbol("throwOnDataLoss"))
+    val consumer = KafkaDataConsumer.acquire(topicPartition, getKafkaParams())
+    val e = intercept[KafkaIllegalStateException] {
+      consumer.invokePrivate(throwOnDataLoss(0L, 1L, topicPartition, groupId, cause))
     }
     assert(e.getCause === cause)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org