You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "sandeep katta (JIRA)" <ji...@apache.org> on 2018/10/24 04:13:00 UTC

[jira] [Commented] (SPARK-25810) Spark structured streaming logs auto.offset.reset=earliest even though startingOffsets is set to latest

    [ https://issues.apache.org/jira/browse/SPARK-25810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661703#comment-16661703 ] 

sandeep katta commented on SPARK-25810:
---------------------------------------

{color:#FF0000}[~abanthiy] {color}thanks for reporting this, Can you please share me the logs screenshot to check exactly which part of the flow is misleading . is it coming part of 

ConsumerConfig values ??

> Spark structured streaming logs auto.offset.reset=earliest even though startingOffsets is set to latest
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25810
>                 URL: https://issues.apache.org/jira/browse/SPARK-25810
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: ANUJA BANTHIYA
>            Priority: Trivial
>
> I have a  issue when i'm trying to read data from kafka using spark structured streaming. 
> Versions : spark-core_2.11 : 2.3.1, spark-sql_2.11 : 2.3.1, spark-sql-kafka-0-10_2.11 : 2.3.1, kafka-client :0.11.0.0
> The issue i am facing is that the spark job always logs auto.offset.reset = earliest  even though latest option is specified in the code during startup of application .
> Code to reproduce: 
> {code:java}
> package com.informatica.exec
> import org.apache.spark.sql.SparkSession
> object kafkaLatestOffset {
>  def main(s: Array[String]) {
>  val spark = SparkSession
>  .builder()
>  .appName("Spark Offset basic example")
>  .master("local[*]")
>  .getOrCreate()
>  val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "topic1")
>  .option("startingOffsets", "latest")
>  .load()
>  val query = df.writeStream
>  .outputMode("complete")
>  .format("console")
>  .start()
>  query.awaitTermination()
>  }
> }
> {code}
>  
> As mentioned in Structured streaming doc, {{startingOffsets}}  need to be set for auto.offset.reset.
> [https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html]
>  * *auto.offset.reset*: Set the source option {{startingOffsets}} to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that {{startingOffsets}} only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
> During runtime , kafka messages are picked from the latest offset , so functional wise it is working as expected. Only log is misleading as it logs  auto.offset.reset = *earliest* .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org