You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/12/03 19:23:00 UTC
[jira] [Assigned] (SPARK-41375) Avoid empty latest KafkaSourceOffset
[ https://issues.apache.org/jira/browse/SPARK-41375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-41375:
------------------------------------
Assignee: Apache Spark
> Avoid empty latest KafkaSourceOffset
> ------------------------------------
>
> Key: SPARK-41375
> URL: https://issues.apache.org/jira/browse/SPARK-41375
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.3.1
> Reporter: Wechar
> Assignee: Apache Spark
> Priority: Major
> Attachments: image-2022-12-04-03-11-11-428.png
>
>
> We found the offsetLog recorded an empty offset `{}` for the KafkaSource:
> !image-2022-12-04-03-11-11-428.png!
> It occurs only once but this empty offset will cause the data duplication.
> {*}Root Cause{*}:
> The root cause is that Kafka consumer may get empty partitions in extreme cases like getting partitions while Kafka cluster is reassigning partitions.
> {code:scala}
> // org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer
> private def partitionsAssignedToConsumer(
> body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
> fetchingEarliestOffset: Boolean = false)
> : Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly {
> withRetriesWithoutInterrupt {
> // Poll to get the latest assigned partitions
> consumer.poll(0)
> val partitions = consumer.assignment() // partitions may be empty
> if (!fetchingEarliestOffset) {
> // Call `position` to wait until the potential offset request triggered by `poll(0)` is
> // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
> // `poll(0)` may reset offsets that should have been set by another request.
> partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
> }
> consumer.pause(partitions)
> logDebug(s"Partitions assigned to consumer: $partitions.")
> body(partitions)
> }
> }
> {code}
> *Solution:*
> Add offset filter for latestOffset.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org