You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/01/30 18:04:00 UTC

[jira] [Updated] (SPARK-29639) Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch

     [ https://issues.apache.org/jira/browse/SPARK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-29639:
----------------------------------
    Target Version/s:   (was: 2.4.4, 2.4.5)

> Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-29639
>                 URL: https://issues.apache.org/jira/browse/SPARK-29639
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Abhinav Choudhury
>            Priority: Major
>
> We have been running a Spark structured job on production for more than a week now. Put simply, it reads data from source Kafka topics (with 4 partitions) and writes to another kafka topic. Everything has been running fine until the job started failing with the following error:
>  
> {noformat}
> Driver stacktrace:
>  === Streaming Query ===
>  Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId = 613a21ad-86e3-4781-891b-17d92c18954a]
>  Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: {"kafka-topic-name":
> {"2":10458347,"1":10460151,"3":10475678,"0":9809564}
> }}
>  Current Available Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: {"kafka-topic-name":
> {"2":10458347,"1":10460151,"3":10475678,"0":10509527}
> }}
> Current State: ACTIVE
>  Thread State: RUNNABLE
> <-- Removed Logical plan -->
>  Some data may have been lost because they are not available in Kafka any more; either the
>  data was aged out by Kafka or the topic may have been deleted before all the data in the
>  topic was processed. If you don't want your streaming query to fail on such cases, set the
>  source option "failOnDataLoss" to "false".{noformat}
> Configuration:
> {noformat}
> Spark 2.4.0
> Spark-sql-kafka 0.10{noformat}
> Looking at the Spark structured streaming query progress logs, it seems like the endOffsets computed for the next batch was actually smaller than the starting offset:
> *Microbatch Trigger 1:*
> {noformat}
> 2019/10/26 23:53:51 INFO utils.Logging[26]: 2019-10-26 23:53:51.767 : ( : Query {
>   "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
>   "runId" : "2d20d633-2768-446c-845b-893243361422",
>   "name" : "StreamingProcessorName",
>   "timestamp" : "2019-10-26T23:53:51.741Z",
>   "batchId" : 2145898,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
>     "getEndOffset" : 0,
>     "setOffsetRange" : 9,
>     "triggerExecution" : 9
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
>     "startOffset" : {
>       "kafka-topic-name" : {
>         "2" : 10452513,
>         "1" : 10454326,
>         "3" : 10469196,
>         "0" : 10503762
>       }
>     },
>     "endOffset" : {
>       "kafka-topic-name" : {
>         "2" : 10452513,
>         "1" : 10454326,
>         "3" : 10469196,
>         "0" : 10503762
>       }
>     },
>     "numInputRows" : 0,
>     "inputRowsPerSecond" : 0.0,
>     "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
>     "description" : "ForeachBatchSink"
>   }
> } in progress{noformat}
> *Next micro batch trigger:*
> {noformat}
> 2019/10/26 23:53:53 INFO utils.Logging[26]: 2019-10-26 23:53:53.951 : ( : Query {
>   "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
>   "runId" : "2d20d633-2768-446c-845b-893243361422",
>   "name" : "StreamingProcessorName",
>   "timestamp" : "2019-10-26T23:53:52.907Z",
>   "batchId" : 2145898,
>   "numInputRows" : 0,
>   "inputRowsPerSecond" : 0.0,
>   "processedRowsPerSecond" : 0.0,
>   "durationMs" : {
>     "addBatch" : 350,
>     "getBatch" : 4,
>     "getEndOffset" : 0,
>     "queryPlanning" : 102,
>     "setOffsetRange" : 24,
>     "triggerExecution" : 1043,
>     "walCommit" : 349
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
>     "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
>     "startOffset" : {
>       "kafka-topic-name" : {
>         "2" : 10452513,
>         "1" : 10454326,
>         "3" : 10469196,
>         "0" : 10503762
>       }
>     },
>     "endOffset" : {
>       "kafka-topic-name" : {
>         "2" : 10452513,
>         "1" : 10454326,
>         "3" : 9773098,
>         "0" : 10503762
>       }
>     },
>     "numInputRows" : 0,
>     "inputRowsPerSecond" : 0.0,
>     "processedRowsPerSecond" : 0.0
>   } ],
>   "sink" : {
>     "description" : "ForeachBatchSink"
>   }
> } in progress{noformat}
> Notice that for partition 3 of the kafka topic, the endOffsets are actually smaller than the starting offsets!
> Checked the HDFS checkpoint dir and the checkpointed offsets look fine and point to the last committed offsets
>  Why is the end offset for a partition being computed to a smaller value?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org