You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wechar (Jira)" <ji...@apache.org> on 2022/12/03 19:10:00 UTC

[jira] [Created] (SPARK-41375) Avoid empty latest KafkaSourceOffset

Wechar created SPARK-41375:
------------------------------

             Summary: Avoid empty latest KafkaSourceOffset
                 Key: SPARK-41375
                 URL: https://issues.apache.org/jira/browse/SPARK-41375
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.3.1
            Reporter: Wechar


We found the offsetLog recorded an empty offset `{}` for the KafkaSource:

!image-2022-12-04-02-43-35-018.png!
It occurs only once but this empty offset will cause the data duplication.

{*}Root Cause{*}:

The root cause is that Kafka consumer may get empty partitions in extreme cases like getting partitions while Kafka cluster is reassigning partitions.

{code:scala}
// org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer
  private def partitionsAssignedToConsumer(
      body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
      fetchingEarliestOffset: Boolean = false)
    : Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly {

    withRetriesWithoutInterrupt {
      // Poll to get the latest assigned partitions
      consumer.poll(0)
      val partitions = consumer.assignment() // partitions may be empty

      if (!fetchingEarliestOffset) {
        // Call `position` to wait until the potential offset request triggered by `poll(0)` is
        // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
        // `poll(0)` may reset offsets that should have been set by another request.
        partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})
      }

      consumer.pause(partitions)
      logDebug(s"Partitions assigned to consumer: $partitions.")
      body(partitions)
    }
  }
{code}

*Solution:*
Add offset filter for latestOffset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org