You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ryne Yang (JIRA)" <ji...@apache.org> on 2019/01/24 22:03:00 UTC

[jira] [Commented] (SPARK-26718) structured streaming fetched wrong current offset from kafka

    [ https://issues.apache.org/jira/browse/SPARK-26718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16751625#comment-16751625 ] 

Ryne Yang commented on SPARK-26718:
-----------------------------------

found the issue, it's the rateLimit calculation. if anyone set the `maxOffsetsPerTrigger` to Long.MaxValue. then this could happen.

in class KafkaMicroBatchReader.scala: 
{code:java}
private def rateLimit(
limit: Long,
from: PartitionOffsetMap,
until: PartitionOffsetMap): PartitionOffsetMap = {
val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
val size = end - begin
logDebug(s"rateLimit $tp size is $size")
if (size > 0) Some(tp -> size) else None
}
}
val total = sizes.values.sum.toDouble
if (total < 1) {
until
} else {
until.map {
case (tp, end) =>
tp -> sizes.get(tp).map { size =>
val begin = from.get(tp).getOrElse(fromNew(tp))
val prorate = limit * (size / total)
// Don't completely starve small topicpartitions
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
// Paranoia, make sure not to return an offset that's past end
Math.min(end, off)
}.getOrElse(end)
}
}
}
{code}
this rateLimit function is the trouble where if limit is set to Long.MaxValue, it could have integer overflow, showing in below example: 
{code:java}
val begin = 100
val limit = Long.MaxValue
val size = 5933L
val total = 5933L.toDouble

val prorate = limit * (size / total)
// Don't completely starve small topicpartitions
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong

println(off)

// prints -9223372036854775709{code}
root cause is `limit * (size/total)', it would lose precision due to Double type and then `begin + Math.floor(prorate)` will overflow.  

 

> structured streaming fetched wrong current offset from kafka
> ------------------------------------------------------------
>
>                 Key: SPARK-26718
>                 URL: https://issues.apache.org/jira/browse/SPARK-26718
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Ryne Yang
>            Priority: Major
>
> when running spark structured streaming using lib: `"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0"`, we keep getting error regarding current offset fetching:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): java.lang.AssertionError: assertion failed: latest offs
> et -9223372036854775808 does not equal -1
> at scala.Predef$.assert(Predef.scala:170)
> at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
> at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.<init>(KafkaMicroBatchReader.scala:329)
> at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
> at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> for some reason, looks like fetchLatestOffset returned a Long.MIN_VALUE for one of the partitions. I checked the structured streaming checkpoint, that was correct, it's the currentAvailableOffset was set to Long.MIN_VALUE.
> kafka broker version: 1.1.0.
> lib we used:
> {\{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" }}
> how to reproduce:
> basically we started a structured streamer and subscribed a topic of 4 partitions. then produced some messages into topic, job crashed and logged the stacktrace like above.
> also the committed offsets seem fine as we see in the logs: 
> {code:java}
> === Streaming Query ===
> Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 31878627-d473-4ee8-955d-d4d3f3f45eb9]
> Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":1}}}
> Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":-9223372036854775808}}}
> {code}
> so spark streaming recorded the correct value for partition: 0, but the current available offsets returned from kafka is showing Long.MIN_VALUE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org