You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sandish Kumar HN (Jira)" <ji...@apache.org> on 2019/11/07 04:04:00 UTC

[jira] [Reopened] (SPARK-29625) Spark Structure Streaming Kafka Wrong Reset Offset twice

     [ https://issues.apache.org/jira/browse/SPARK-29625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sandish Kumar HN reopened SPARK-29625:
--------------------------------------

It happened again, As I can't share the code here, I will just post the flow of the code, 

Look at partition-32 at [2019-11-06 04:33:40,013]

And at [2019-11-06 04:33:40,015]

Why is Spark is trying to reset twice within the same batch? one with right offset and another with the wrong offset.

------------------------------------------------------------------------------------
df-reader=spark.read
df.transfer=spark.map (spark transforms functions)
df-stream-writer=spark.readStream
df-stream-writer=spark.writeStream

----------------------------SourceCode Flow ----------------------------------

analytics-session:
 type: spark
 name: etl-flow-correlate
 options: (
 "spark.sql.shuffle.partitions" : "2" ;
 "spark.sql.windowExec.buffer.spill.threshold" : "98304"
 )

df-reader:
 name: ipmac
 format: jdbc
 options: (
 "url" : "postgres.jdbc.url" ;
 "driver" : "org.postgresql.Driver" ;
 "dbtable" : "tablename"
 )

df-transform:
 name: ColumnRename
 transform: alias
 input: ipmac
 options: (
 "id" : "key" ;
 "ip" : "src_ip"
 )

df-stream-reader:
 name: flow
 format: kafka
 options: (
 "kafka.bootstrap.servers": "kafka.server" ;
 "subscribe": "topic name" ;
 "maxOffsetsPerTrigger": "400000" ;
 "startingOffsets": "latest" ;
 "failOnDataLoss": "true"
 )

df-transform:
 name: decodedFlow
 transform: EventDecode
 input: flow
 options: (
 "schema" : "flow"
 )

df-transform:
 name: flowWithDate
 transform: convertTimestampCols
 input: decodedFlow
 options: (
 "timestampcols" : "start_time,end_time" ;
 "resolution" : "microsec"
 )

df-transform:
 name: correlatedFlow
 transform: correlateOverWindow
 input: flowWithDate
 options: (
 "rightDF" : "ipmacColumnRename" ;
 "timestampcol" : "start_time" ;
 "timewindow" : "1 minute" ;
 "join-col" : "key,src_ip" ;
 "expires-min" : "expires_in_min"
 )

df-transform:
 name: nonNullFlow
 transform: filter
 input: correlatedFlow
 options: (
 "notnull" : "mac"
 )

df-transform:
 name: flowColumnRename
 transform: alias
 input: nonNullFlow
 options: (
 "key" : "tid"
 )

df-transform:
 name: flowEncoded
 transform: EventEncode
 input: flowColumnRename
 options: (
 "key" : "id"
 )

df-stream-writer:
 name: flowEncoded
 format: kafka
 options: (
 "kafka.bootstrap.servers" : "kafka.server" ;
 "checkpointLocation" : "spark.checkpoint.flow-correlation" ;
 "trigger-processing" : "10 seconds" ;
 "query-duration" : "60 minutes" ;
 "topic" : "topicname"
 )

----------------------------Logs ------------------------------------------------------------
2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-185 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-32 to offset 16395511.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-65 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-98 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-131 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-164 to offset 1684812.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-197 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-11 to offset 10372315.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-44 to offset 593757072.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-77 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-110 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-143 to offset 3604761.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-176 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-23 to offset 1411947.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-56 to offset 20028.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-89 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-122 to offset 0.
[2019-11-06 04:33:40,013] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-155 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-188 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-2 to offset 1497504.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-35 to offset 170674.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-68 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-101 to offset 6061319.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-134 to offset 434270.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-167 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-14 to offset 27453734.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-47 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-80 to offset 3886244047.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-113 to offset 47673876.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-146 to offset 69143474.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-179 to offset 1911402.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-26 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-59 to offset 1420.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-92 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-125 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-158 to offset 0.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-191 to offset 404803.
[2019-11-06 04:33:40,014] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-5 to offset 0.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-38 to offset 13017209.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-71 to offset 0.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-104 to offset 40213846.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-137 to offset 1320323.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-170 to offset 24501362.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-17 to offset 8097944.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-50 to offset 11641614.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-83 to offset 16773015.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-116 to offset 0.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-149 to offset 0.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-182 to offset 558395221.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-29 to offset 5006957.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-62 to offset 0.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-95 to offset 6231.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-128 to offset 2803241.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-161 to offset 11851717.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-194 to offset 421665.
[2019-11-06 04:33:40,015] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-57ea8170-0f95-4724-8168-1f9347b74661--1740056283-driver-0] Resetting offset for partition collector.endpoint.flow-32 to offset 13982042.
[2019-11-06 04:33:40,707] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO StreamExecution: Committed offsets for batch 179995. Metadata OffsetSeqMetadata(0,1573014820010,Map(spark.sql.shuffle.partitions -> 2))
[2019-11-06 04:33:40,904] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO KafkaSource: GetBatch called with start = Some(\{"collector.endpoint.flow":{"137":1320304,"146":69143455,"92":0,"101":6061319,"173":0,"182":558395056,"191":404803,"83":16773015,"155":0,"164":1684812,"110":0,"119":0,"128":2803241,"95":6231,"23":1411947,"104":40213846,"131":0,"122":0,"194":421665,"176":0,"77":0,"86":0,"158":0,"185":0,"167":0,"50":11641614,"59":1420,"68":0,"32":16395369,"41":0,"113":47673434,"140":0,"149":0,"53":0,"62":0,"134":434270,"35":170674,"8":3033607,"44":593756300,"17":8097944,"26":0,"125":0,"89":0,"80":3886243960,"116":0,"71":0,"98":0,"107":0,"11":10372315,"74":4502099,"56":20028,"29":5006956,"38":13017209,"47":0,"20":0,"2":1497504,"65":0,"5":0,"184":0,"193":0,"175":95,"166":28711213,"14":27453734,"142":0,"151":4091,"124":0,"106":0,"133":238220,"160":0,"169":12257580,"178":8085850,"115":0,"187":205435831,"196":0,"181":0,"163":0,"172":0,"46":0,"118":89298234,"127":0,"199":0,"136":0,"100":688763285,"82":2168299,"109":0,"91":0,"190":562857,"64":0,"55":2823882,"73":32060386,"145":0,"154":0,"85":0,"94":0,"67":42490756,"58":0,"139":0,"40":17263,"49":4491462,"130":259003,"4":0,"13":0,"121":16455127,"148":80307763,"157":0,"22":0,"31":21617060,"76":2267985312,"103":0,"112":0,"16":0,"97":3357,"7":114729,"79":224706081,"88":1521872,"70":3599,"52":10031237,"43":0,"25":7577657,"34":0,"61":0,"10":9766410,"189":77373,"37":0,"180":0,"1":0,"198":0,"19":0,"28":52777,"147":0,"174":0,"156":0,"183":45017559,"129":0,"138":0,"120":3272084,"165":121886562,"192":3904007,"60":82897002,"186":0,"87":0,"96":117437927,"168":0,"177":0,"150":0,"132":0,"159":0,"141":0,"105":0,"114":0,"123":0,"195":0,"78":537811048,"69":0,"90":0,"63":0,"99":0,"45":10398861,"54":9977645,"171":0,"72":0,"81":53055506,"144":0,"126":2111278,"153":0,"162":0,"27":0,"36":0,"108":11516984,"135":7059122,"117":0,"18":0,"9":0,"48":0,"57":9386973,"21":13028341,"3":6022016,"84":10,"12":0,"93":107502,"102":31806328,"75":82719392,"30":0,"39":95262599,"111":0,"66":0,"15":0,"42":135251396,"51":302205,"24":0,"33":183248704,"6":1029373,"152":0,"179":1911395,"188":0,"143":3604761,"170":24501324,"161":11851717,"197":0,"0":27843849}}), end = \{"collector.endpoint.flow":{"137":1320323,"146":69143474,"92":0,"101":6061319,"173":0,"182":558395221,"191":404803,"83":16773015,"155":0,"164":1684812,"110":0,"119":0,"128":2803241,"95":6231,"23":1411947,"104":40213846,"131":0,"122":0,"194":421665,"176":0,"77":0,"86":0,"158":0,"185":0,"167":0,"50":11641614,"59":1420,"68":0,"32":13982042,"41":0,"113":47673876,"140":0,"149":0,"53":0,"62":0,"134":434270,"35":170674,"8":3033607,"44":593757072,"17":8097944,"26":0,"125":0,"89":0,"80":3886244047,"116":0,"71":0,"98":0,"107":0,"11":10372315,"74":4502099,"56":20028,"29":5006957,"38":13017209,"47":0,"20":0,"2":1497504,"65":0,"5":0,"184":0,"193":0,"175":95,"166":28711213,"14":27453734,"142":0,"151":4091,"124":0,"106":0,"133":238220,"160":0,"169":12257580,"178":8085869,"115":0,"187":205436005,"196":0,"181":0,"163":0,"172":0,"46":0,"118":89298322,"127":0,"199":0,"136":0,"100":688763285,"82":2168299,"109":0,"91":0,"190":562857,"64":0,"55":2823882,"73":32060386,"145":0,"154":0,"85":0,"94":0,"67":42490756,"58":0,"139":0,"40":17263,"49":4491462,"130":259003,"4":0,"13":0,"121":16455127,"148":80307898,"157":0,"22":0,"31":21617060,"76":2267994373,"103":0,"112":0,"16":0,"97":3357,"7":114729,"79":224710443,"88":1521872,"70":3599,"52":10031237,"43":0,"25":7577657,"34":0,"61":0,"10":9766428,"189":77373,"37":0,"180":0,"1":0,"198":0,"19":0,"28":52777,"147":0,"174":0,"156":0,"183":45017559,"129":0,"138":0,"120":3272084,"165":121886562,"192":3904007,"60":82897014,"186":0,"87":0,"96":117437927,"168":0,"177":0,"150":0,"132":0,"159":0,"141":0,"105":0,"114":0,"123":0,"195":0,"78":537812520,"69":0,"90":0,"63":0,"99":0,"45":10398861,"54":9977645,"171":0,"72":0,"81":53055506,"144":0,"126":2111278,"153":0,"162":0,"27":0,"36":0,"108":11516984,"135":7059122,"117":0,"18":0,"9":0,"48":0,"57":9387013,"21":13028341,"3":6022056,"84":10,"12":0,"93":107502,"102":31806328,"75":82719392,"30":0,"39":95262599,"111":0,"66":0,"15":0,"42":135251396,"51":302205,"24":0,"33":183248776,"6":1029373,"152":0,"179":1911402,"188":0,"143":3604761,"170":24501362,"161":11851717,"197":0,"0":27843849}}
[2019-11-06 04:33:40,908] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 INFO KafkaSource: Partitions added: Map()
[2019-11-06 04:33:40,936] \{bash_operator.py:128} INFO - 19/11/06 04:33:40 ERROR StreamExecution: Query [id = 0f3bc878-23ba-4dbc-9745-74f4953c8ced, runId = a05076ce-2b6a-45c5-bd09-094750a439e2] terminated with error
[2019-11-06 04:33:40,936] \{bash_operator.py:128} INFO - java.lang.IllegalStateException: Partition collector.endpoint.flow-32's offset was changed from 16395369 to 13982042, some data may have been missed.
[2019-11-06 04:33:40,936] \{bash_operator.py:128} INFO - Some data may have been lost because they are not available in Kafka any more; either the
[2019-11-06 04:33:40,936] \{bash_operator.py:128} INFO - data was aged out by Kafka or the topic may have been deleted before all the data in the
[2019-11-06 04:33:40,936] \{bash_operator.py:128} INFO - topic was processed. If you don't want your streaming query to fail on such cases, set the
[2019-11-06 04:33:40,936] \{bash_operator.py:128} INFO - source option "failOnDataLoss" to "false".
[2019-11-06 04:33:40,936] \{bash_operator.py:128} INFO -

> Spark Structure Streaming Kafka Wrong Reset Offset twice
> --------------------------------------------------------
>
>                 Key: SPARK-29625
>                 URL: https://issues.apache.org/jira/browse/SPARK-29625
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.1
>            Reporter: Sandish Kumar HN
>            Priority: Major
>
> Spark Structure Streaming Kafka Reset Offset twice, once with right offsets and second time with very old offsets 
> {code}
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-151 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-118 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-85 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-52 to offset 122677634.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-19 to offset 0.
> [2019-10-28 19:27:40,013] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO Fetcher: [Consumer clientId=consumer-1, groupId=spark-kafka-source-cfacf6b7-b0aa-443f-b01d-b17212087545--1376165614-driver-0] Resetting offset for partition topic-52 to offset 120504922.*
> [2019-10-28 19:27:40,153] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 INFO ContextCleaner: Cleaned accumulator 810
> {code}
> which is causing a Data loss issue.  
> {code}
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 19/10/28 19:27:40 ERROR StreamExecution: Query [id = d62ca9e4-6650-454f-8691-a3d576d1e4ba, runId = 3946389f-222b-495c-9ab2-832c0422cbbb] terminated with error
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - java.lang.IllegalStateException: Partition topic-52's offset was changed from 122677598 to 120504922, some data may have been missed.
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - Some data may have been lost because they are not available in Kafka any more; either the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -  data was aged out by Kafka or the topic may have been deleted before all the data in the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -  topic was processed. If you don't want your streaming query to fail on such cases, set the
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO -  source option "failOnDataLoss" to "false".
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:614)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$7.apply(StreamExecution.scala:610)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> [2019-10-28 19:27:40,351] \{bash_operator.py:128} INFO - 	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org