You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/08/12 08:26:00 UTC

[jira] [Updated] (SPARK-28641) MicroBatchExecution committed offsets greater than available offsets

     [ https://issues.apache.org/jira/browse/SPARK-28641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-28641:
---------------------------------
    Target Version/s:   (was: 2.3.1)

> MicroBatchExecution committed offsets greater than available offsets
> --------------------------------------------------------------------
>
>                 Key: SPARK-28641
>                 URL: https://issues.apache.org/jira/browse/SPARK-28641
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>         Environment: HDP --> 3.0.0
> Spark --> 2.3.1
> Kafka --> 2.1.1
>            Reporter: MariaCarrie
>            Priority: Major
>              Labels: MicroBatchExecution, dataAvailable
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> I use structure-streaming to consume kafka data, Trigger Type is default and checkpoint is enabled, but looking at the log, I find the structure-streaming data before processing, the application log is as follows:
>  
> ^19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}), end = \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}^
> ^19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()^
> ^19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's offset was changed from 13978260 to 9053058, some data may have been missed.^ 
> ^Some data may have been lost because they are not available in Kafka any more; either the^
>  ^data was aged out by Kafka or the topic may have been deleted before all the data in the^
>  ^topic was processed. If you want your streaming query to fail on such cases, set the source^
>  ^option "failOnDataLoss" to "true".^
>  
> I see that when you get the latestOffsets they are compared with the committedOffsets to see if they are newData.
>  
> ^private def dataAvailable: Boolean = {^
>  ^availableOffsets.exists {^
>  ^case (source, available) =>^
>  ^committedOffsets^
>  ^.get(source)^
>  ^.map(committed => committed != available)^
>  ^.getOrElse(true)^
>  ^}^
>  ^}^
>  
> I think it is kafka appeared what problem, cause the fetchLatestOffsets methods returned earliestOffsets. However, the data was successfully processed and committed. Whether or not it can be determined in the dataAvailable method, if availableOffsets has been commited, the batch will no longer be marked as newData.
>  
> I don't know what I think is correct, if continue processing earliestOffsets, then the structured-streaming can't timely corresponding, I'm glad to receive any suggestion!
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org