You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wechar (Jira)" <ji...@apache.org> on 2022/12/03 19:10:00 UTC
[jira] [Created] (SPARK-41375) Avoid empty latest KafkaSourceOffset
Wechar created SPARK-41375:
------------------------------
Summary: Avoid empty latest KafkaSourceOffset
Key: SPARK-41375
URL: https://issues.apache.org/jira/browse/SPARK-41375
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 3.3.1
Reporter: Wechar
We found the offsetLog recorded an empty offset `{}` for the KafkaSource:
!image-2022-12-04-02-43-35-018.png!
It occurs only once but this empty offset will cause the data duplication.
{*}Root Cause{*}:
The root cause is that Kafka consumer may get empty partitions in extreme cases like getting partitions while Kafka cluster is reassigning partitions.
{code:scala}
// org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer
private def partitionsAssignedToConsumer(
body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
fetchingEarliestOffset: Boolean = false)
: Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment() // partitions may be empty
if (!fetchingEarliestOffset) {
// Call `position` to wait until the potential offset request triggered by `poll(0)` is
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
// `poll(0)` may reset offsets that should have been set by another request.
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
}
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions.")
body(partitions)
}
}
{code}
*Solution:*
Add offset filter for latestOffset.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org