You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/06/21 18:30:00 UTC

[jira] [Assigned] (SPARK-32044) [SS] Kakfa continuous processing print mislead initial offsets log

     [ https://issues.apache.org/jira/browse/SPARK-32044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-32044:
------------------------------------

    Assignee: Apache Spark

> [SS] Kakfa continuous processing print mislead initial offsets log 
> -------------------------------------------------------------------
>
>                 Key: SPARK-32044
>                 URL: https://issues.apache.org/jira/browse/SPARK-32044
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.6
>            Reporter: Zhongwei Zhu
>            Assignee: Apache Spark
>            Priority: Trivial
>             Fix For: 2.4.7
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When using structured streaming in continuous processing mode, after restart spark job, spark job can correctly pick up offsets in checkpoint location from last epoch. But it always print out below log:
> 20/06/12 00:58:09 INFO [stream execution thread for [id = 34e5b909-f9fe-422a-89c0-081251a68693, runId = 0246e19d-aaa1-4a5c-9091-bab1a0578a0a]] kafka010.KafkaContinuousReader: Initial offsets: \{"kafka_topic":{"8":51618236,"11":51610655,"2":51622889,"5":51637171,"14":51637346,"13":51627784,"4":51606960,"7":51632475,"1":51636129,"10":51632212,"9":51634107,"3":51611013,"12":51626567,"15":51640774,"6":51637823,"0":51629106}}
>  And this log is misleading, as spark didn't use this one as initial offsets. This is caused by below code in KafkaContinuousReader
> {code:java}
> offset = start.orElse {
>   val offsets = initialOffsets match {
>     case EarliestOffsetRangeLimit =>
>       KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
>     case LatestOffsetRangeLimit =>
>       KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
>     case SpecificOffsetRangeLimit(p) =>
>       offsetReader.fetchSpecificOffsets(p, reportDataLoss)
>   }
>   logInfo(s"Initial offsets: $offsets")
>   offsets
> }
> {code}
>  The code inside orElse block is always executed even when start has value.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org