You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sandish Kumar HN (Jira)" <ji...@apache.org> on 2019/10/29 16:56:00 UTC

[jira] [Commented] (SPARK-29625) Spark Structure Streaming Kafka Wrong Reset Offset twice

    [ https://issues.apache.org/jira/browse/SPARK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962228#comment-16962228 ] 

Sandish Kumar HN commented on SPARK-29625:
------------------------------------------

[~tdas] It would be very helpful if you could review this.

> Spark Structure Streaming Kafka Wrong Reset Offset twice
> --------------------------------------------------------
>
>                 Key: SPARK-29625
>                 URL: https://issues.apache.org/jira/browse/SPARK-29625
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.1
>            Reporter: Sandish Kumar HN
>            Priority: Major
>
> Spark Structure Streaming Kafka Reset Offset twice, once with right offsets and second time with very old offsets 
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-151 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-118 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-85 to offset 0.
>  **
> *[2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-52 to offset 122677634.*
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-19 to offset 0.
>  **
> *[2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-52 to offset 120504922.*
> [2019-10-28 19:27:40,153] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO ContextCleaner: Cleaned accumulator 810
> which is causing a Data loss issue. 
>  
> {{[2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 ERROR StreamExecution: Query [id = d62ca9e4-6650-454f-8691-a3d576d1e4ba, runId = 3946389f-222b-495c-9ab2-832c0422cbbb] terminated with error
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - java.lang.IllegalStateException: Partition topic-52's offset was changed from 122677598 to 120504922, some data may have been missed.
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - Some data may have been lost because they are not available in Kafka any more; either the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -  data was aged out by Kafka or the topic may have been deleted before all the data in the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -  topic was processed. If you don't want your streaming query to fail on such cases, set the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -  source option "failOnDataLoss" to "false".
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:614)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:610)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org