You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hurshal Patel (JIRA)" <ji...@apache.org> on 2016/03/16 20:11:33 UTC

[jira] [Resolved] (SPARK-13941) kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker

     [ https://issues.apache.org/jira/browse/SPARK-13941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hurshal Patel resolved SPARK-13941.
-----------------------------------
    Resolution: Not A Bug

There was a versioning mismatch in my spark job which depended on kafka 0.9, which is definitely not supported.

> kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
> -------------------------------------------------------------------
>
>                 Key: SPARK-13941
>                 URL: https://issues.apache.org/jira/browse/SPARK-13941
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.1
>            Reporter: Hurshal Patel
>
> I am connecting to a Kafka cluster with the following (anonymized) code:
> {code}
>   var stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
>       ssc, kafkaParams, topics)
>   stream.foreachRDD { rdd =>
>     val df = sqlContext.createDataFrame(rdd.map(bytesToString), stringSchema)
>     df.foreachPartition { partition => 
>       val targetNode = chooseTarget(TaskContext.partitionId)
>       loadPartition(targetNode, partition)
>     }
>   }
> {code}
> I am using Kafka 0.8.2.0-1.kafka1.2.0.p0.2 (Cloudera CDH 5.3.1) and Spark 1.4.1 and this works fine.
> After upgrading to Spark 1.5.1, my tasks are failing (stacktrace is below). Are there any notable changes to the KafkaDirectStream or KafkaRDD that would cause this or does Cloudera's Kafka distribution have known issues with 1.5.1?
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 12407.0 failed 4 times, most recent failure: Lost task 5.3 in stage 12407.0 (TID 55638, 172.18.203.25): org.apache.spark.SparkException: Couldn't connect to leader for topic XXX: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at scala.util.Either.fold(Either.scala:97)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:163)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)
> at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
> at org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply$mcV$sp(DataFrame.scala:1369)
> at org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply(DataFrame.scala:1369)
> at org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply(DataFrame.scala:1369)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
> at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
> at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1368)
> at com.foobar.kafka.KafkaLoader.load(KafkaLoader.scala:35)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Couldn't connect to leader for topic XXX: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at scala.util.Either.fold(Either.scala:97)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:163)
> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)
> at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org