You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "MariaCarrie (JIRA)" <ji...@apache.org> on 2019/08/13 03:07:00 UTC
[jira] [Comment Edited] (SPARK-28641) MicroBatchExecution committed
offsets greater than available offsets
[ https://issues.apache.org/jira/browse/SPARK-28641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16905761#comment-16905761 ]
MariaCarrie edited comment on SPARK-28641 at 8/13/19 3:06 AM:
--------------------------------------------------------------
Hi [~hyukjin.kwon], In the test environment, I couldn't reproduce.But in a production environment, this happens occasionally.I think may be kafka error, cause got earliestOffsets in the fetchLatestOffsets method.
In order to alleviate this situation, can I modify the dataAvailable method?
^private def dataAvailable: Boolean = {^
^availableOffsets.exists {^
^case (source, available) =>^
^committedOffsets^
^.get(source).forall(committed => {^
^val availablePTO = JsonHelper.toObject[Map[String, Any]](available.json())^
^val committedPTO = JsonHelper.toObject[Map[String, Any]](committed.json())^
^var dataAvailable = true^
^availablePTO.foreach(tp => {^
^if (committedPTO.contains(tp._1)) {^
^val availableTP = JsonHelper.toObject[Map[String, Long]](tp._2)^
^val committedTP = JsonHelper.toObject[Map[String, Long]](committedPTO(tp._1))^
^availableTP.foreach(t => {^
^if (committedTP.contains(t._1) && committedTP(t._1) > t._2) {^
^logWarning(s"Offset fallback, commit offsets:[$committedPTO],available offsets:[$availablePTO].")^
^dataAvailable = false^
^}^
^})^
^}^
^})^
^(committed != available) && dataAvailable^
^})^
^}^
^}^
was (Author: mariacarrie):
Hi [~hyukjin.kwon], In the test environment, I couldn't reproduce.But in a production environment, this happens occasionally.I think may be kafka error, cause got earliestOffsets in the fetchLatestOffsets method.
In order to alleviate this situation, can I modify the dataAvailable method?
^private def dataAvailable: Boolean = {
availableOffsets.exists {
case (source, available) =>
committedOffsets
.get(source).forall(committed => {
val availablePTO = JsonHelper.toObject[Map[String, Any]](available.json())
val committedPTO = JsonHelper.toObject[Map[String, Any]](committed.json())
var dataAvailable = true
availablePTO.foreach(tp => {
if (committedPTO.contains(tp._1)) {
val availableTP = JsonHelper.toObject[Map[String, Long]](tp._2)
val committedTP = JsonHelper.toObject[Map[String, Long]](committedPTO(tp._1))
availableTP.foreach(t => {
if (committedTP.contains(t._1) && committedTP(t._1) > t._2) {
logWarning(s"Offset fallback, commit offsets:[$committedPTO],available offsets:[$availablePTO].")
dataAvailable = false
}
})
}
})
(committed != available) && dataAvailable
})
}
}^
> MicroBatchExecution committed offsets greater than available offsets
> --------------------------------------------------------------------
>
> Key: SPARK-28641
> URL: https://issues.apache.org/jira/browse/SPARK-28641
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 2.3.1
> Environment: HDP --> 3.0.0
> Spark --> 2.3.1
> Kafka --> 2.1.1
> Reporter: MariaCarrie
> Priority: Major
> Labels: MicroBatchExecution, dataAvailable
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> I use structure-streaming to consume Kafka data, Trigger Type is default and checkpoint is enabled, but looking at the log, I find the structure-streaming data before processing, the application log is as follows:
>
> {code}
> 19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}), end = \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}
> 19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()
> 19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's offset was changed from 13978260 to 9053058, some data may have been missed.^
> Some data may have been lost because they are not available in Kafka any more; either the
> data was aged out by Kafka or the topic may have been deleted before all the data in the
> topic was processed. If you want your streaming query to fail on such cases, set the source
> option "failOnDataLoss" to "true".
> {code}
>
> I see that when you get the {{latestOffsets}} they are compared with the {{committedOffsets}} to see if they are {{newData}}.
>
> {code}
> private def dataAvailable: Boolean = {
> availableOffsets.exists {
> case (source, available) =>
> committedOffsets.get(source).map(committed => committed != available).getOrElse(true)
> }
> }
> {code}
>
> I think it is Kafka appeared what problem, cause the {{fetchLatestOffsets}} methods returned {{earliestOffsets}}. However, the data was successfully processed and committed. Whether or not it can be determined in the {{dataAvailable}} method, if {{availableOffsets}} has been committed, the batch will no longer be marked as newData.
> I don't know what I think is correct, if continue processing {{earliestOffsets}}, then the structured-streaming can't timely corresponding, I'm glad to receive any suggestion!
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org