You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/04/25 06:12:00 UTC

[jira] [Commented] (SPARK-27529) Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException

    [ https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16825763#comment-16825763 ] 

Hyukjin Kwon commented on SPARK-27529:
--------------------------------------

Spark 1.x is EOL. Can you check if it works in higher version?

> Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
> -------------------------------------------------------------------------
>
>                 Key: SPARK-27529
>                 URL: https://issues.apache.org/jira/browse/SPARK-27529
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 1.5.0
>            Reporter: Dmitry Goldenberg
>            Priority: Major
>
> We have a Spark Streaming consumer which at a certain point started consistently failing upon a restart with the below error.
> Some details:
>  * Spark version is 1.5.0.
>  * Kafka version is 0.8.2.1 (2.10-0.8.2.1).
>  * The topic is configured with: retention.ms=1471228928, max.message.bytes=100000000.
>  * The consumer runs with auto.offset.reset=smallest.
>  * No checkpointing is currently enabled.
> I don't see anything in the Spark or Kafka doc to understand why this is happening. From googling around,
> {noformat}
> https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
> Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.{noformat}
> Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible causes.
> {noformat}
> You've under-provisioned Kafka storage and / or Spark compute capacity.
> The result is that data is being deleted before it has been processed.{noformat}
> All we're trying to do is start the consumer and consume from the topic from the earliest available offset. Why would we not be able to do that? How can the offsets be out of range if we're saying, just read from the earliest available?
> Since we have the retention.ms set to 1 year and we created the topic just a few weeks ago, I'd not expect any deletion being done by Kafka as we're consuming.
> I'd like to understand the actual cause of this error. Any recommendations on a workaround would be appreciated.
> Stack traces:
> {noformat}
> 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler
> .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job
> 2019-04-19 11:35:17,160 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1555682554000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 147.0 failed 4 times, most recent failure: Lost task
> 10.3 in stage 147.0 (TID 2368, 10.150.0.58): kafka.common.OffsetOutOfRangeException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
> at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca
> la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.ja
> r:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) ~[spark-assembly-1.5.0-h
> adoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) ~[spark-assembly-1.5.0-h
> adoop2.4.0.jar:1.5.0]
> at scala.Option.foreach(Option.scala:236) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
> 5.0]
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
> 5.0]
> at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1
> .5.0]
> at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218) ~[acme-ingest-kafka-spa
> rk-2.0.0-SNAPSHOT.jar:?]
> at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207) ~[acme-ingest-kafka-spa
> rk-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) ~[spark-assembly-1
> .5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) ~[spark-assembly-1
> .5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) ~[spark-ass
> embly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) ~[spark-ass
> embly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) ~[
> spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) ~[spark-a
> ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) ~[spark-a
> ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) ~[spark-assembly-1.5.0-had
> oop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at scala.util.Try$.apply(Try.scala:161) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207) ~[spark-asse
> mbly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) ~[spark-assembly-1.
> 5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) ~[spark-assembly-1.
> 5.0-hadoop2.4.0.jar:1.5.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206) ~[spark-assembly-1.5.0-hadoop2.4.0.j
> ar:1.5.0]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_201]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_201]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
> Caused by: kafka.common.OffsetOutOfRangeException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_201]
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_201]
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_201]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_201]
> at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_201]
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) ~[acme-ingest-kafka-spark-2.0.0-
> SNAPSHOT.jar:?]
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) ~[acme-ingest-kafka-spark-2.0.0-SNA
> PSHOT.jar:?]
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:
> ?]
> at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) ~[spark-assembly-1.5.0-hadoop2
> .4.0.jar:1.5.0]
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) ~[spark-assembly-1.5.0-hadoop2
> .4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:88) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> ... 3 more{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org