You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2022/04/08 07:13:00 UTC

[jira] [Updated] (SPARK-38824) Bug in async commit of Kafka offset in DirectKafkaInputDStream

     [ https://issues.apache.org/jira/browse/SPARK-38824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-38824:
----------------------------------
    Component/s: DStreams
                     (was: Spark Core)

> Bug in async commit of Kafka offset in DirectKafkaInputDStream
> --------------------------------------------------------------
>
>                 Key: SPARK-38824
>                 URL: https://issues.apache.org/jira/browse/SPARK-38824
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.8, 3.0.0, 3.0.3, 3.1.0, 3.1.2, 3.2.1
>            Reporter: SOUVIK PAUL
>            Priority: Major
>
> I added a few debug statements at the following lines and found few issues.
> 1. At line 254 of override def compute(validTime: Time): Option[KafkaRDD[K, V]] in DirectKafkaInputDStream.scala:
> System.out.print("Called commitAll at time " + validTime + " " +
> commitQueue.toArray.mkString("Array(", ", ", ")") + "\n")
> 2. At line 454 of test("offset recovery from kafka") in DirectKafkaStreamSuite.scala:
> print("Called commitAsync at " + time + " " + offsets.mkString("Array(", ", ", ")") + "\n")
> This shows that the commitAll call is not properly handled. Since, it is called inside compute function. There is a chance that during last RDD, we will miss the last offset. In the current example we have missed the offset commit of range 8->10.
> Can someone confirm if this is a design choice or a bug?
> The current log is something like this.
> Called commitAll at time 1645548063100 ms Array()
> Called commitAll at time 1645548063200 ms Array()
> Called commitAll at time 1645548063300 ms Array()
> Called commitAll at time 1645548063400 ms Array()
> Called commitAll at time 1645548063500 ms Array()
> Called commitAll at time 1645548063600 ms Array()
> Called commitAll at time 1645548063700 ms Array()
> Called commitAll at time 1645548063800 ms Array()
> Called commitAll at time 1645548063900 ms Array()
> Called commitAll at time 1645548064000 ms Array()
> Called commitAll at time 1645548064100 ms Array()
> Called commitAll at time 1645548064200 ms Array()
> Called commitAsync at 1645548063100 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [0 -> 4]))
> Called commitAsync at 1645548063200 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064300 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [0 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063300 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063400 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063500 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064400 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063600 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063700 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063800 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548063900 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064500 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064000 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064100 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064200 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064300 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAll at time 1645548064600 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 4]))
> Called commitAsync at 1645548064400 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 8]))
> Called commitAsync at 1645548064500 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064600 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAll at time 1645548064700 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [4 -> 8]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [8 -> 8]), OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [8 -> 8]))
> Called commitAsync at 1645548064700 ms Array(OffsetRange(topic: 'recoveryfromkafka', partition: 0, range: [8 -> 10]))
> Regards,
> Souvik Paul
> GitHub: @paulsouri



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org