You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2024/02/29 00:27:04 UTC
(spark) branch master updated: [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 36df0a63a139 [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions
36df0a63a139 is described below
commit 36df0a63a139704ccd2a344d057e430430b11ad8
Author: micheal-o <mi...@gmail.com>
AuthorDate: Thu Feb 29 09:26:47 2024 +0900
[SPARK-47135][SS] Implement error classes for Kafka data loss exceptions
### What changes were proposed in this pull request?
In the kafka connector code, we have several code that throws the java **IllegalStateException** to report data loss, while reading from Kafka. This change is to properly classify those exceptions using the new error framework. Adds a new exception type `SparkIllegalStateException` that can receive error class. New error classes are introduced for Kafka data loss errors.
### Why are the changes needed?
New error framework for better error messages
### Does this PR introduce _any_ user-facing change?
Yes, better error message with error class
### How was this patch tested?
Updated existing tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45221 from micheal-o/bmo/IllegalStateEx.
Lead-authored-by: micheal-o <mi...@gmail.com>
Co-authored-by: micheal-o <mi...@databricks.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../main/resources/error/kafka-error-classes.json | 56 +++++++++++
.../spark/sql/kafka010/KafkaContinuousStream.scala | 26 ++---
.../spark/sql/kafka010/KafkaExceptions.scala | 109 +++++++++++++++++++--
.../spark/sql/kafka010/KafkaMicroBatchStream.scala | 6 +-
.../spark/sql/kafka010/KafkaOffsetReader.scala | 4 +-
.../sql/kafka010/KafkaOffsetReaderAdmin.scala | 42 +++++---
.../sql/kafka010/KafkaOffsetReaderConsumer.scala | 46 ++++++---
.../apache/spark/sql/kafka010/KafkaSource.scala | 6 +-
.../spark/sql/kafka010/KafkaSourceProvider.scala | 8 --
.../sql/kafka010/consumer/KafkaDataConsumer.scala | 85 ++++++++--------
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 2 +-
.../sql/kafka010/KafkaOffsetReaderSuite.scala | 2 +-
.../kafka010/consumer/KafkaDataConsumerSuite.scala | 11 ++-
13 files changed, 290 insertions(+), 113 deletions(-)
diff --git a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json
index ea7ffb592a55..a7b22e1370fd 100644
--- a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json
+++ b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json
@@ -22,5 +22,61 @@
"Some of partitions in Kafka topic(s) report available offset which is less than end offset during running query with Trigger.AvailableNow. The error could be transient - restart your query, and report if you still see the same issue.",
"latest offset: <latestOffset>, end offset: <endOffset>"
]
+ },
+ "KAFKA_DATA_LOSS" : {
+ "message" : [
+ "Some data may have been lost because they are not available in Kafka any more;",
+ "either the data was aged out by Kafka or the topic may have been deleted before all the data in the",
+ "topic was processed.",
+ "If you don't want your streaming query to fail on such cases, set the source option failOnDataLoss to false.",
+ "Reason:"
+ ],
+ "subClass" : {
+ "ADDED_PARTITION_DOES_NOT_START_FROM_OFFSET_ZERO" : {
+ "message" : [
+ "Added partition <topicPartition> starts from <startOffset> instead of 0."
+ ]
+ },
+ "COULD_NOT_READ_OFFSET_RANGE" : {
+ "message" : [
+ "Could not read records in offset [<startOffset>, <endOffset>) for topic partition <topicPartition>",
+ "with consumer group <groupId>."
+ ]
+ },
+ "INITIAL_OFFSET_NOT_FOUND_FOR_PARTITIONS" : {
+ "message" : [
+ "Cannot find initial offsets for partitions <partitions>. They may have been deleted."
+ ]
+ },
+ "PARTITIONS_DELETED" : {
+ "message" : [
+ "Partitions <partitions> have been deleted."
+ ]
+ },
+ "PARTITIONS_DELETED_AND_GROUP_ID_CONFIG_PRESENT" : {
+ "message" : [
+ "Partitions <partitions> have been deleted.",
+ "Kafka option 'kafka.<groupIdConfig>' has been set on this query, it is",
+ "not recommended to set this option. This option is unsafe to use since multiple concurrent",
+ "queries or sources using the same group id will interfere with each other as they are part",
+ "of the same consumer group. Restarted queries may also suffer interference from the",
+ "previous run having the same group id. The user should have only one query per group id,",
+ "and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka",
+ "consumers from the previous query are marked dead by the Kafka group coordinator before the",
+ "restarted query starts running."
+ ]
+ },
+ "PARTITION_OFFSET_CHANGED" : {
+ "message" : [
+ "Partition <topicPartition> offset was changed from <prevOffset> to <newOffset>."
+ ]
+ },
+ "START_OFFSET_RESET" : {
+ "message" : [
+ "Starting offset for <topicPartition> was <offset> but consumer reset to <fetchedOffset>."
+ ]
+ }
+ },
+ "sqlState" : "22000"
}
}
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index a86acd971a1c..9b7f52585545 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -92,13 +92,17 @@ class KafkaContinuousStream(
val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
if (deletedPartitions.nonEmpty) {
- val message = if (
- offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
- s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
- } else {
- s"$deletedPartitions are gone. Some data may have been missed."
- }
- reportDataLoss(message)
+ val (message, config) =
+ if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ (s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+ Some(ConsumerConfig.GROUP_ID_CONFIG))
+ } else {
+ (s"$deletedPartitions are gone. Some data may have been missed.", None)
+ }
+
+ reportDataLoss(
+ message,
+ () => KafkaExceptions.partitionsDeleted(deletedPartitions, config))
}
val startOffsets = newPartitionOffsets ++
@@ -137,12 +141,12 @@ class KafkaContinuousStream(
override def toString(): String = s"KafkaSource[$offsetReader]"
/**
- * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * If `failOnDataLoss` is true, this method will throw the exception.
* Otherwise, just log a warning.
*/
- private def reportDataLoss(message: String): Unit = {
+ private def reportDataLoss(message: String, getException: () => Throwable): Unit = {
if (failOnDataLoss) {
- throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ throw getException()
} else {
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
}
@@ -221,7 +225,7 @@ class KafkaContinuousPartitionReader(
// This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range,
// or if it's the endpoint of the data range (i.e. the "true" next offset).
- case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+ case e: KafkaIllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
val range = consumer.getAvailableOffsetRange()
if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) {
// retry
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
index b0e30f37af51..300d288507a3 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
@@ -17,19 +17,23 @@
package org.apache.spark.sql.kafka010
+import scala.jdk.CollectionConverters._
+
import org.apache.kafka.common.TopicPartition
-import org.apache.spark.{ErrorClassesJsonReader, SparkException}
+import org.apache.spark.{ErrorClassesJsonReader, SparkException, SparkThrowable}
-object KafkaExceptions {
- private val errorClassesJsonReader: ErrorClassesJsonReader =
+private object KafkaExceptionsHelper {
+ val errorClassesJsonReader: ErrorClassesJsonReader =
new ErrorClassesJsonReader(
Seq(getClass.getClassLoader.getResource("error/kafka-error-classes.json")))
+}
+object KafkaExceptions {
def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
tpsForPrefetched: Set[TopicPartition],
tpsForEndOffset: Set[TopicPartition]): SparkException = {
- val errMsg = errorClassesJsonReader.getErrorMessage(
+ val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
"MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED",
Map(
"tpsForPrefetched" -> tpsForPrefetched.toString(),
@@ -42,7 +46,7 @@ object KafkaExceptions {
def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
prefetchedOffset: Map[TopicPartition, Long],
endOffset: Map[TopicPartition, Long]): SparkException = {
- val errMsg = errorClassesJsonReader.getErrorMessage(
+ val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
"END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED",
Map(
"prefetchedOffset" -> prefetchedOffset.toString(),
@@ -55,7 +59,7 @@ object KafkaExceptions {
def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
tpsForLatestOffset: Set[TopicPartition],
tpsForEndOffset: Set[TopicPartition]): SparkException = {
- val errMsg = errorClassesJsonReader.getErrorMessage(
+ val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
"LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW",
Map(
"tpsForLatestOffset" -> tpsForLatestOffset.toString(),
@@ -68,7 +72,7 @@ object KafkaExceptions {
def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
latestOffset: Map[TopicPartition, Long],
endOffset: Map[TopicPartition, Long]): SparkException = {
- val errMsg = errorClassesJsonReader.getErrorMessage(
+ val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
"END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_LATEST_WITH_TRIGGER_AVAILABLENOW",
Map(
"latestOffset" -> latestOffset.toString(),
@@ -77,4 +81,95 @@ object KafkaExceptions {
)
new SparkException(errMsg)
}
+
+ def couldNotReadOffsetRange(
+ startOffset: Long,
+ endOffset: Long,
+ topicPartition: TopicPartition,
+ groupId: String,
+ cause: Throwable): KafkaIllegalStateException = {
+ new KafkaIllegalStateException(
+ errorClass = "KAFKA_DATA_LOSS.COULD_NOT_READ_OFFSET_RANGE",
+ messageParameters = Map(
+ "startOffset" -> startOffset.toString,
+ "endOffset" -> endOffset.toString,
+ "topicPartition" -> topicPartition.toString,
+ "groupId" -> groupId),
+ cause = cause)
+ }
+
+ def startOffsetReset(
+ topicPartition: TopicPartition,
+ offset: Long,
+ fetchedOffset: Long): KafkaIllegalStateException = {
+ new KafkaIllegalStateException(
+ errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET",
+ messageParameters = Map(
+ "topicPartition" -> topicPartition.toString,
+ "offset" -> offset.toString,
+ "fetchedOffset" -> fetchedOffset.toString))
+ }
+
+ def initialOffsetNotFoundForPartitions(
+ partitions: Set[TopicPartition]): KafkaIllegalStateException = {
+ new KafkaIllegalStateException(
+ errorClass = "KAFKA_DATA_LOSS.INITIAL_OFFSET_NOT_FOUND_FOR_PARTITIONS",
+ messageParameters = Map("partitions" -> partitions.toString))
+ }
+
+ def addedPartitionDoesNotStartFromZero(
+ topicPartition: TopicPartition,
+ startOffset: Long): KafkaIllegalStateException = {
+ new KafkaIllegalStateException(
+ errorClass = "KAFKA_DATA_LOSS.ADDED_PARTITION_DOES_NOT_START_FROM_OFFSET_ZERO",
+ messageParameters =
+ Map("topicPartition" -> topicPartition.toString, "startOffset" -> startOffset.toString))
+ }
+
+ def partitionsDeleted(
+ partitions: Set[TopicPartition],
+ groupIdConfigName: Option[String]): KafkaIllegalStateException = {
+ groupIdConfigName match {
+ case Some(config) =>
+ new KafkaIllegalStateException(
+ errorClass = "KAFKA_DATA_LOSS.PARTITIONS_DELETED_AND_GROUP_ID_CONFIG_PRESENT",
+ messageParameters = Map("partitions" -> partitions.toString, "groupIdConfig" -> config))
+ case None =>
+ new KafkaIllegalStateException(
+ errorClass = "KAFKA_DATA_LOSS.PARTITIONS_DELETED",
+ messageParameters = Map("partitions" -> partitions.toString))
+ }
+ }
+
+ def partitionOffsetChanged(
+ topicPartition: TopicPartition,
+ prevOffset: Long,
+ newOffset: Long): KafkaIllegalStateException = {
+ new KafkaIllegalStateException(
+ errorClass = "KAFKA_DATA_LOSS.PARTITION_OFFSET_CHANGED",
+ messageParameters = Map(
+ "topicPartition" -> topicPartition.toString,
+ "prevOffset" -> prevOffset.toString,
+ "newOffset" -> newOffset.toString))
+ }
+}
+
+/**
+ * Illegal state exception thrown with an error class.
+ */
+private[kafka010] class KafkaIllegalStateException(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ cause: Throwable = null)
+ extends IllegalStateException(
+ KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage(
+ errorClass, messageParameters), cause)
+ with SparkThrowable {
+
+ override def getSqlState: String =
+ KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass)
+
+ override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava
+
+ override def getErrorClass: String = errorClass
}
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index e92ebecfce08..fefa3efcc353 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream(
}
/**
- * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * If `failOnDataLoss` is true, this method will throw the exception.
* Otherwise, just log a warning.
*/
- private def reportDataLoss(message: String): Unit = {
+ private def reportDataLoss(message: String, getException: () => Throwable): Unit = {
if (failOnDataLoss) {
- throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ throw getException()
} else {
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
}
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index b1370e10501f..df0c7e9c0425 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -62,7 +62,7 @@ private[kafka010] trait KafkaOffsetReader {
*/
def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
- reportDataLoss: String => Unit): KafkaSourceOffset
+ reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset
/**
* Resolves the specific offsets based on timestamp per topic-partition.
@@ -147,7 +147,7 @@ private[kafka010] trait KafkaOffsetReader {
def getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets: PartitionOffsetMap,
untilPartitionOffsets: PartitionOffsetMap,
- reportDataLoss: String => Unit): Seq[KafkaOffsetRange]
+ reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange]
}
private[kafka010] object KafkaOffsetReader extends Logging {
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index 9206dfe9b3f2..27adccf6f902 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -148,7 +148,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
override def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
- reportDataLoss: String => Unit): KafkaSourceOffset = {
+ reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -404,8 +404,10 @@ private[kafka010] class KafkaOffsetReaderAdmin(
offsetRangesBase.map(range => (range.topicPartition, range.untilOffset)).toMap
// No need to report data loss here
- val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => ()).partitionToOffsets
- val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => ()).partitionToOffsets
+ val resolvedFromOffsets =
+ fetchSpecificOffsets(fromOffsetsMap, (_, _) => ()).partitionToOffsets
+ val resolvedUntilOffsets =
+ fetchSpecificOffsets(untilOffsetsMap, (_, _) => ()).partitionToOffsets
val ranges = offsetRangesBase.map(_.topicPartition).map { tp =>
KafkaOffsetRange(tp, resolvedFromOffsets(tp), resolvedUntilOffsets(tp), preferredLoc = None)
}
@@ -444,7 +446,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
override def getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets: PartitionOffsetMap,
untilPartitionOffsets: PartitionOffsetMap,
- reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
+ reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
@@ -452,22 +454,31 @@ private[kafka010] class KafkaOffsetReaderAdmin(
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
reportDataLoss(
- s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
+ () =>
+ KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}
logInfo(s"Partitions added: $newPartitionInitialOffsets")
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
- s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed",
+ () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
}
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
- val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
- s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
- } else {
- s"$deletedPartitions are gone. Some data may have been missed."
- }
- reportDataLoss(message)
+ val (message, config) =
+ if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ (s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+ Some(ConsumerConfig.GROUP_ID_CONFIG))
+ } else {
+ (s"$deletedPartitions are gone. Some data may have been missed.", None)
+ }
+
+ reportDataLoss(
+ message,
+ () =>
+ KafkaExceptions.partitionsDeleted(deletedPartitions, config))
}
// Use the until partitions to calculate offset ranges to ignore partitions that have
@@ -484,8 +495,11 @@ private[kafka010] class KafkaOffsetReaderAdmin(
val fromOffset = fromOffsets(tp)
val untilOffset = untilOffsets(tp)
if (untilOffset < fromOffset) {
- reportDataLoss(s"Partition $tp's offset was changed from " +
- s"$fromOffset to $untilOffset, some data may have been missed")
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed",
+ () =>
+ KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
}
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
}
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
index a859cd3d55a9..d4953a4a65e3 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
@@ -170,7 +170,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
override def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
- reportDataLoss: String => Unit): KafkaSourceOffset = {
+ reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset = {
val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions =>
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
@@ -189,7 +189,9 @@ private[kafka010] class KafkaOffsetReaderConsumer(
off != KafkaOffsetRangeLimit.EARLIEST =>
if (fetched(tp) != off) {
reportDataLoss(
- s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
+ s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}",
+ () =>
+ KafkaExceptions.startOffsetReset(tp, off, fetched(tp)))
}
case _ =>
// no real way to check that beginning or end is reasonable
@@ -451,8 +453,10 @@ private[kafka010] class KafkaOffsetReaderConsumer(
offsetRangesBase.map(range => (range.topicPartition, range.untilOffset)).toMap
// No need to report data loss here
- val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => ()).partitionToOffsets
- val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => ()).partitionToOffsets
+ val resolvedFromOffsets =
+ fetchSpecificOffsets(fromOffsetsMap, (_, _) => ()).partitionToOffsets
+ val resolvedUntilOffsets =
+ fetchSpecificOffsets(untilOffsetsMap, (_, _) => ()).partitionToOffsets
val ranges = offsetRangesBase.map(_.topicPartition).map { tp =>
KafkaOffsetRange(tp, resolvedFromOffsets(tp), resolvedUntilOffsets(tp), preferredLoc = None)
}
@@ -491,7 +495,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
override def getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets: PartitionOffsetMap,
untilPartitionOffsets: PartitionOffsetMap,
- reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
+ reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
@@ -499,22 +503,31 @@ private[kafka010] class KafkaOffsetReaderConsumer(
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
reportDataLoss(
- s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
+ s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
+ () =>
+ KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
}
logInfo(s"Partitions added: $newPartitionInitialOffsets")
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
- s"Added partition $p starts from $o instead of 0. Some data may have been missed")
+ s"Added partition $p starts from $o instead of 0. Some data may have been missed",
+ () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
}
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
- val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
- s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
- } else {
- s"$deletedPartitions are gone. Some data may have been missed."
- }
- reportDataLoss(message)
+ val (message, config) =
+ if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ (s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+ Some(ConsumerConfig.GROUP_ID_CONFIG))
+ } else {
+ (s"$deletedPartitions are gone. Some data may have been missed.", None)
+ }
+
+ reportDataLoss(
+ message,
+ () =>
+ KafkaExceptions.partitionsDeleted(deletedPartitions, config))
}
// Use the until partitions to calculate offset ranges to ignore partitions that have
@@ -531,8 +544,11 @@ private[kafka010] class KafkaOffsetReaderConsumer(
val fromOffset = fromOffsets(tp)
val untilOffset = untilOffsets(tp)
if (untilOffset < fromOffset) {
- reportDataLoss(s"Partition $tp's offset was changed from " +
- s"$fromOffset to $untilOffset, some data may have been missed")
+ reportDataLoss(
+ s"Partition $tp's offset was changed from " +
+ s"$fromOffset to $untilOffset, some data may have been missed",
+ () =>
+ KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
}
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
}
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 6f6d6319cd6f..83ed7fff23fc 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -344,12 +344,12 @@ private[kafka010] class KafkaSource(
override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
/**
- * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`.
+ * If `failOnDataLoss` is true, this method will throw the exception.
* Otherwise, just log a warning.
*/
- private def reportDataLoss(message: String): Unit = {
+ private def reportDataLoss(message: String, getException: () => Throwable): Unit = {
if (failOnDataLoss) {
- throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+ throw getException()
} else {
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
}
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 73446eddd25f..a5fc00ff29ff 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -585,14 +585,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
| option "failOnDataLoss" to "true".
""".stripMargin
- val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE =
- """
- |Some data may have been lost because they are not available in Kafka any more; either the
- | data was aged out by Kafka or the topic may have been deleted before all the data in the
- | topic was processed. If you don't want your streaming query to fail on such cases, set the
- | source option "failOnDataLoss" to "false".
- """.stripMargin
-
val CUSTOM_GROUP_ID_ERROR_MESSAGE =
s"""Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set on this query, it is
| not recommended to set this option. This option is unsafe to use since multiple concurrent
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 2bd883a3cd51..fbc4a500322e 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -32,6 +32,7 @@ import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil}
+import org.apache.spark.sql.kafka010.KafkaExceptions
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET}
import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
@@ -340,8 +341,11 @@ private[kafka010] class KafkaDataConsumer(
releaseConsumer()
fetchedData.reset()
- reportDataLoss(topicPartition, groupId, failOnDataLoss,
- s"Cannot fetch offset $toFetchOffset", e)
+ if (failOnDataLoss) {
+ throwOnDataLoss(toFetchOffset, untilOffset, topicPartition, groupId, e)
+ } else {
+ logOnDataLoss(topicPartition, groupId, s"Cannot fetch offset $toFetchOffset", e)
+ }
val oldToFetchOffsetd = toFetchOffset
toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset)
@@ -443,7 +447,7 @@ private[kafka010] class KafkaDataConsumer(
s"""
|The current available offset range is $range.
| Offset $offset is out of range, and records in [$offset, $untilOffset) will be
- | skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}
+ | skipped ${additionalWarningMessage(topicPartition, groupId)}
""".stripMargin
logWarning(warningMessage)
UNKNOWN_OFFSET
@@ -457,7 +461,7 @@ private[kafka010] class KafkaDataConsumer(
// then we will see `offset` disappears first then appears again. Although the parameters
// are same, the state in Kafka cluster is changed, so the outer loop won't be endless.
logWarning(s"Found a disappeared offset $offset. Some data may be lost " +
- s"${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}")
+ s"${additionalWarningMessage(topicPartition, groupId)}")
offset
} else {
// ------------------------------------------------------------------------------
@@ -468,7 +472,7 @@ private[kafka010] class KafkaDataConsumer(
s"""
|The current available offset range is $range.
| Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be
- | skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}
+ | skipped ${additionalWarningMessage(topicPartition, groupId)}
""".stripMargin
logWarning(warningMessage)
range.earliest
@@ -535,18 +539,17 @@ private[kafka010] class KafkaDataConsumer(
}
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
- reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = true,
- s"Cannot fetch records in [$offset, ${record.offset})")
+ throwOnDataLoss(offset, record.offset, consumer.topicPartition, consumer.groupId)
// Never happen as "reportDataLoss" will throw an exception
throw new IllegalStateException(
"reportDataLoss didn't throw an exception when 'failOnDataLoss' is true")
} else if (record.offset >= untilOffset) {
- reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false,
+ logOnDataLoss(consumer.topicPartition, consumer.groupId,
s"Skip missing records in [$offset, $untilOffset)")
// Set `nextOffsetToFetch` to `untilOffset` to finish the current batch.
fetchedRecord.withRecord(null, untilOffset)
} else {
- reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false,
+ logOnDataLoss(consumer.topicPartition, consumer.groupId,
s"Skip missing records in [$offset, ${record.offset})")
fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
}
@@ -624,31 +627,47 @@ private[kafka010] class KafkaDataConsumer(
/**
* Return an addition message including useful message and instruction.
*/
- private def additionalMessage(
+ private def additionalWarningMessage(
+ topicPartition: TopicPartition,
+ groupId: String): String = {
+ s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
+ s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
+ }
+
+ /**
+ * Throw an exception when data loss is detected.
+ */
+ private def throwOnDataLoss(
+ startOffset: Long,
+ endOffset: Long,
topicPartition: TopicPartition,
groupId: String,
- failOnDataLoss: Boolean): String = {
- if (failOnDataLoss) {
- s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
- s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE"
- } else {
- s"(GroupId: $groupId, TopicPartition: $topicPartition). " +
- s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE"
- }
+ cause: Throwable = null): Unit = {
+ dataLoss += 1
+ throw KafkaExceptions.couldNotReadOffsetRange(
+ startOffset,
+ endOffset,
+ topicPartition,
+ groupId,
+ cause)
}
/**
- * Throw an exception or log a warning as per `failOnDataLoss`.
+ * Log a warning when data loss is detected.
*/
- private def reportDataLoss(
+ private def logOnDataLoss(
topicPartition: TopicPartition,
groupId: String,
- failOnDataLoss: Boolean,
message: String,
cause: Throwable = null): Unit = {
- val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}"
+ val finalMessage = s"$message ${additionalWarningMessage(topicPartition, groupId)}"
+
dataLoss += 1
- reportDataLoss0(failOnDataLoss, finalMessage, cause)
+ if (cause != null) {
+ logWarning(finalMessage, cause)
+ } else {
+ logWarning(finalMessage)
+ }
}
private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match {
@@ -714,24 +733,4 @@ private[kafka010] object KafkaDataConsumer extends Logging {
new KafkaDataConsumer(topicPartition, kafkaParams, consumerPool, fetchedDataPool)
}
-
- private def reportDataLoss0(
- failOnDataLoss: Boolean,
- finalMessage: String,
- cause: Throwable = null): Unit = {
- if (failOnDataLoss) {
- if (cause != null) {
- throw new IllegalStateException(finalMessage, cause)
- } else {
- throw new IllegalStateException(finalMessage)
- }
- } else {
- if (cause != null) {
- logWarning(finalMessage, cause)
- } else {
- logWarning(finalMessage)
- }
- }
- }
-
}
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index fb5e71a1e7b8..9ae6a9290f80 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -904,7 +904,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
- ExpectFailure[IllegalStateException](e => {
+ ExpectFailure[KafkaIllegalStateException](e => {
// The offset of `topic2` should be changed from 2 to 1
assert(e.getMessage.contains("was changed from 2 to 1"))
})
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index 332db5483b83..691e81f02a8c 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -170,7 +170,7 @@ class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with Kafk
val offsetRanges = reader.getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets,
untilPartitionOffsets,
- _ => {})
+ (_, _) => {})
assert(offsetRanges.sortBy(_.topicPartition.toString) === Seq(
KafkaOffsetRange(tp1, 0, 33, None),
KafkaOffsetRange(tp1, 33, 66, None),
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala
index b6748d0f261e..dc3319499982 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala
@@ -32,7 +32,7 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.{TaskContext, TaskContextImpl}
import org.apache.spark.kafka010.KafkaDelegationTokenTest
-import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder}
+import org.apache.spark.sql.kafka010.{KafkaIllegalStateException, KafkaTestUtils, RecordBuilder}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ResetSystemProperties
@@ -91,11 +91,12 @@ class KafkaDataConsumerSuite
consumerPool.reset()
}
- test("SPARK-19886: Report error cause correctly in reportDataLoss") {
+ test("SPARK-19886: Report error cause correctly in throwOnDataLoss") {
val cause = new Exception("D'oh!")
- val reportDataLoss = PrivateMethod[Unit](Symbol("reportDataLoss0"))
- val e = intercept[IllegalStateException] {
- KafkaDataConsumer.invokePrivate(reportDataLoss(true, "message", cause))
+ val throwOnDataLoss = PrivateMethod[Unit](Symbol("throwOnDataLoss"))
+ val consumer = KafkaDataConsumer.acquire(topicPartition, getKafkaParams())
+ val e = intercept[KafkaIllegalStateException] {
+ consumer.invokePrivate(throwOnDataLoss(0L, 1L, topicPartition, groupId, cause))
}
assert(e.getCause === cause)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org