You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Valeria Vasylieva (JIRA)" <ji...@apache.org> on 2019/02/20 14:07:00 UTC

[jira] [Commented] (SPARK-25810) Spark structured streaming logs auto.offset.reset=earliest even though startingOffsets is set to latest

    [ https://issues.apache.org/jira/browse/SPARK-25810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773044#comment-16773044 ] 

Valeria Vasylieva commented on SPARK-25810:
-------------------------------------------

I suppose the cause is here: [KafkaSourceProvider:521|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L517]
{code:java}
// Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial
// offsets by itself instead of counting on KafkaConsumer.
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
{code}
But I do not really think that it should be fixed, as Spark defines custom algorithm for offset checking and fetching in [KafkaSource|https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala], thanks to it we can avoid Kafka errors on some offsets that do not exist etc.

[ConsumerConfig|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java] belongs to Kafka, so we cannot just change the logging here.

> Spark structured streaming logs auto.offset.reset=earliest even though startingOffsets is set to latest
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25810
>                 URL: https://issues.apache.org/jira/browse/SPARK-25810
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>            Reporter: ANUJA BANTHIYA
>            Priority: Trivial
>
> I have a  issue when i'm trying to read data from kafka using spark structured streaming. 
> Versions : spark-core_2.11 : 2.3.1, spark-sql_2.11 : 2.3.1, spark-sql-kafka-0-10_2.11 : 2.3.1, kafka-client :0.11.0.0
> The issue i am facing is that the spark job always logs auto.offset.reset = earliest  even though latest option is specified in the code during startup of application .
> Code to reproduce: 
> {code:java}
> package com.informatica.exec
> import org.apache.spark.sql.SparkSession
> object kafkaLatestOffset {
>  def main(s: Array[String]) {
>  val spark = SparkSession
>  .builder()
>  .appName("Spark Offset basic example")
>  .master("local[*]")
>  .getOrCreate()
>  val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "topic1")
>  .option("startingOffsets", "latest")
>  .load()
>  val query = df.writeStream
>  .outputMode("complete")
>  .format("console")
>  .start()
>  query.awaitTermination()
>  }
> }
> {code}
>  
> As mentioned in Structured streaming doc, {{startingOffsets}}  need to be set for auto.offset.reset.
> [https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html]
>  * *auto.offset.reset*: Set the source option {{startingOffsets}} to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that {{startingOffsets}} only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
> During runtime , kafka messages are picked from the latest offset , so functional wise it is working as expected. Only log is misleading as it logs  auto.offset.reset = *earliest* .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org