You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vaibhav Khanduja (JIRA)" <ji...@apache.org> on 2016/06/01 21:15:59 UTC

[jira] [Created] (SPARK-15713) Exception using Kafka Streaming: java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V

Vaibhav Khanduja created SPARK-15713:
----------------------------------------

             Summary: Exception using Kafka Streaming:  java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.<init>(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
                 Key: SPARK-15713
                 URL: https://issues.apache.org/jira/browse/SPARK-15713
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, Spark Shell, Streaming
    Affects Versions: 1.6.0, 1.5.0
         Environment: Linux Ubuntu
            Reporter: Vaibhav Khanduja


I am having problems making code (below) work with spark streaming on kafka. Looks like a bug here,I get same error with a stand alone program.

Steps to recreate:
 - Start spark shell 

./bin/spark-shell --master local[*] --jars /root/.m2/repository/org/apache/kafka/kafka_2.10/0.8.0/kafka_2.10-0.8.0.jar,/root/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.5.0/spark-streaming-kafka_2.10-1.5.0.jar,/root/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar

- execute following code, reading data from kafka

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import sys.process.stringSeqToProcess
import scala.language.postfixOps

val usecase = 1
var filePath = "/test"

var fileName = ""
var selectStr = ""
var topicStr = ""

def mergeFiles(srcPath: String, firstTime: Boolean): Unit = {
    if (firstTime)  {
        Seq("bash", "-c", "cat "+srcPath+"/part-00000  > "+srcPath+".txt")!!
    } else {
        Seq("bash", "-c", "cat "+srcPath+"/part-00000  >> "+srcPath+".txt")!!
    }
}

val ssc = new StreamingContext(sc, Seconds(10))
val kParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

def process(fileName: String, topicStr: String, selectStr: String): Unit = {
    var firstTime = true
    val kTopic = Set(topicStr)
    val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kParams, kTopic).map(_._2)
    kStream.foreachRDD { rdd => 
        if (rdd.count() > 0) {
            sqlContext.read.json(rdd).registerTempTable("mytable")
            if (firstTime) {
                sqlContext.sql("SELECT * FROM mytable").printSchema()
            }
            val df = sqlContext.sql(selectStr)
            df.collect.foreach(println)
            df.rdd.saveAsTextFile(fileName)
            mergeFiles(fileName, firstTime)
            firstTime = false
        }
    }
}


fileName = filePath + "topic"
topicStr = "topic"
selectStr = "SELECT Timestamp, Score, MetricType, EntityName FROM mytable"
process(fileName, topicStr, selectStr)

ssc.start()

- The exception is thrown

scala> 16/06/01 21:10:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.<init>(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/06/01 21:10:20 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.<init>(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/06/01 21:10:21 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.<init>(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

16/06/01 21:10:21 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/06/01 21:10:21 ERROR JobScheduler: Error running job streaming job 1464815420000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.<init>(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
	at $line32.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$process$1.apply(<console>:44)
	at $line32.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$process$1.apply(<console>:43)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: kafka.message.MessageAndMetadata.<init>(Ljava/lang/String;ILkafka/message/Message;JLkafka/serializer/Decoder;Lkafka/serializer/Decoder;)V
	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:222)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1553)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	... 3 more




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org