You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dmitry Goldenberg (JIRA)" <ji...@apache.org> on 2019/06/04 14:19:00 UTC

[jira] [Reopened] (SPARK-27567) Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'

     [ https://issues.apache.org/jira/browse/SPARK-27567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dmitry Goldenberg reopened SPARK-27567:
---------------------------------------

Issue not resolved, it appears intermittently. We have a QA box with Kafka installed and our Spark Streaming job pulling from it. Kafka has replication factor set to 1 since it's a cluster of 1 node. Just saw the error there.

> Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27567
>                 URL: https://issues.apache.org/jira/browse/SPARK-27567
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 1.5.0
>         Environment: GCP / 170~14.04.1-Ubuntu
>            Reporter: Dmitry Goldenberg
>            Priority: Major
>
> Some of our consumers intermittently die with the stack traces I'm including. Once restarted they run for a while then die again.
> I can't find any cohesive documentation on what this error means and how to go about troubleshooting it. Any help would be appreciated.
> *Kafka version* is 0.8.2.1 (2.10-0.8.2.1).
> Some of the errors seen look like this:
> {noformat}
> ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on 10.150.0.54: remote Rpc client disassociated{noformat}
> Main error stack trace:
> {noformat}
> 2019-04-23 20:36:54,323 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error g
> enerating jobs for time 1556066214000 ms
> org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], [hdfs.hbase.acme.attachmen
> ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], [hdfs.hbase.acme.attachme
> nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], [hdfs.hbase.acme.attachme
> nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], [hdfs.hbase.acme.attach
> ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], [hdfs.hbase.acme.attac
> hments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att
> achments,21], [hdfs.hbase.acme.attachments,3], [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at
> tachments,61]))
> at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j
> ar:?]
> at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
> r:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
> r:1.5.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at scala.Option.orElse(Option.scala:257) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
> r:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
> r:1.5.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at scala.Option.orElse(Option.scala:257) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at scala.util.Try$.apply(Try.scala:161) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) ~[spark-assembly-1.
> 5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> Exception in thread "main" org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49],
> [hdfs.hbase.acme.attachments,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55]
> , [hdfs.hbase.acme.attachments,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13]
> , [hdfs.hbase.acme.attachments,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,5
> 3], [hdfs.hbase.acme.attachments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,
> 11], [hdfs.hbase.acme.attachments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachment
> s,45], [hdfs.hbase.acme.attachments,21], [hdfs.hbase.acme.attachments,3], [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], [hdfs.hbase.acme.attachmen
> ts,17], [hdfs.hbase.acme.attachments,61]))
> at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
> at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
> at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 2019-04-23 20:36:55,265 FATAL Unable to register shutdown hook because JVM is shutting down.
> [Stage 15597:=================================>                   (41 + 6) / 64]Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedExc
> eption
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.InterruptedException
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:559)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
> at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
> at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
> at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218)
> at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ... 2 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org