You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/01/30 18:04:00 UTC
[jira] [Updated] (SPARK-29639) Spark Kafka connector 0.10.0
generates incorrect end offsets for micro batch
[ https://issues.apache.org/jira/browse/SPARK-29639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-29639:
----------------------------------
Target Version/s: (was: 2.4.4, 2.4.5)
> Spark Kafka connector 0.10.0 generates incorrect end offsets for micro batch
> ----------------------------------------------------------------------------
>
> Key: SPARK-29639
> URL: https://issues.apache.org/jira/browse/SPARK-29639
> Project: Spark
> Issue Type: Bug
> Components: Input/Output, Structured Streaming
> Affects Versions: 2.4.0
> Reporter: Abhinav Choudhury
> Priority: Major
>
> We have been running a Spark structured job on production for more than a week now. Put simply, it reads data from source Kafka topics (with 4 partitions) and writes to another kafka topic. Everything has been running fine until the job started failing with the following error:
>
> {noformat}
> Driver stacktrace:
> === Streaming Query ===
> Identifier: MetricComputer [id = af26bab1-0d89-4766-934e-ad5752d6bb08, runId = 613a21ad-86e3-4781-891b-17d92c18954a]
> Current Committed Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: {"kafka-topic-name":
> {"2":10458347,"1":10460151,"3":10475678,"0":9809564}
> }}
> Current Available Offsets: {KafkaV2[Subscribe[kafka-topic-name]]: {"kafka-topic-name":
> {"2":10458347,"1":10460151,"3":10475678,"0":10509527}
> }}
> Current State: ACTIVE
> Thread State: RUNNABLE
> <-- Removed Logical plan -->
> Some data may have been lost because they are not available in Kafka any more; either the
> data was aged out by Kafka or the topic may have been deleted before all the data in the
> topic was processed. If you don't want your streaming query to fail on such cases, set the
> source option "failOnDataLoss" to "false".{noformat}
> Configuration:
> {noformat}
> Spark 2.4.0
> Spark-sql-kafka 0.10{noformat}
> Looking at the Spark structured streaming query progress logs, it seems like the endOffsets computed for the next batch was actually smaller than the starting offset:
> *Microbatch Trigger 1:*
> {noformat}
> 2019/10/26 23:53:51 INFO utils.Logging[26]: 2019-10-26 23:53:51.767 : ( : Query {
> "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
> "runId" : "2d20d633-2768-446c-845b-893243361422",
> "name" : "StreamingProcessorName",
> "timestamp" : "2019-10-26T23:53:51.741Z",
> "batchId" : 2145898,
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0,
> "durationMs" : {
> "getEndOffset" : 0,
> "setOffsetRange" : 9,
> "triggerExecution" : 9
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
> "startOffset" : {
> "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
> }
> },
> "endOffset" : {
> "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
> }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
> } ],
> "sink" : {
> "description" : "ForeachBatchSink"
> }
> } in progress{noformat}
> *Next micro batch trigger:*
> {noformat}
> 2019/10/26 23:53:53 INFO utils.Logging[26]: 2019-10-26 23:53:53.951 : ( : Query {
> "id" : "99fe6c51-9f4a-4f6f-92d3-3c336ef5e06b",
> "runId" : "2d20d633-2768-446c-845b-893243361422",
> "name" : "StreamingProcessorName",
> "timestamp" : "2019-10-26T23:53:52.907Z",
> "batchId" : 2145898,
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0,
> "durationMs" : {
> "addBatch" : 350,
> "getBatch" : 4,
> "getEndOffset" : 0,
> "queryPlanning" : 102,
> "setOffsetRange" : 24,
> "triggerExecution" : 1043,
> "walCommit" : 349
> },
> "stateOperators" : [ ],
> "sources" : [ {
> "description" : "KafkaV2[Subscribe[kafka-topic-name]]",
> "startOffset" : {
> "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 10469196,
> "0" : 10503762
> }
> },
> "endOffset" : {
> "kafka-topic-name" : {
> "2" : 10452513,
> "1" : 10454326,
> "3" : 9773098,
> "0" : 10503762
> }
> },
> "numInputRows" : 0,
> "inputRowsPerSecond" : 0.0,
> "processedRowsPerSecond" : 0.0
> } ],
> "sink" : {
> "description" : "ForeachBatchSink"
> }
> } in progress{noformat}
> Notice that for partition 3 of the kafka topic, the endOffsets are actually smaller than the starting offsets!
> Checked the HDFS checkpoint dir and the checkpointed offsets look fine and point to the last committed offsets
> Why is the end offset for a partition being computed to a smaller value?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org