You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/12/03 19:23:00 UTC

[jira] [Commented] (SPARK-41375) Avoid empty latest KafkaSourceOffset

    [ https://issues.apache.org/jira/browse/SPARK-41375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17642901#comment-17642901 ] 

Apache Spark commented on SPARK-41375:
--------------------------------------

User 'wecharyu' has created a pull request for this issue:
https://github.com/apache/spark/pull/38898

> Avoid empty latest KafkaSourceOffset
> ------------------------------------
>
>                 Key: SPARK-41375
>                 URL: https://issues.apache.org/jira/browse/SPARK-41375
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.3.1
>            Reporter: Wechar
>            Priority: Major
>         Attachments: image-2022-12-04-03-11-11-428.png
>
>
> We found the offsetLog recorded an empty offset `{}` for the KafkaSource:
> !image-2022-12-04-03-11-11-428.png!
> It occurs only once but this empty offset will cause the data duplication.
> {*}Root Cause{*}:
> The root cause is that Kafka consumer may get empty partitions in extreme cases like getting partitions while Kafka cluster is reassigning partitions.
> {code:scala}
> // org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer
>   private def partitionsAssignedToConsumer(
>       body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
>       fetchingEarliestOffset: Boolean = false)
>     : Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly {
>     withRetriesWithoutInterrupt {
>       // Poll to get the latest assigned partitions
>       consumer.poll(0)
>       val partitions = consumer.assignment() // partitions may be empty
>       if (!fetchingEarliestOffset) {
>         // Call `position` to wait until the potential offset request triggered by `poll(0)` is
>         // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
>         // `poll(0)` may reset offsets that should have been set by another request.
>         partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
>       }
>       consumer.pause(partitions)
>       logDebug(s"Partitions assigned to consumer: $partitions.")
>       body(partitions)
>     }
>   }
> {code}
> *Solution:*
> Add offset filter for latestOffset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org