You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Wwh 吴 <ww...@hotmail.com> on 2015/07/19 11:41:27 UTC

spark1.4.0, streaming + kafka, throw org.apache.spark.util.TaskCompletionListenerException

hi
I test spark streaming and kafka,the applicaion like this:import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import net.sf.json.JSONObject

/**
 * Created by asus on 2015/7/16.
 */
object KafkaEventAnalytics {
  def  main (args: Array[String]) :Unit = {
    val conf = new SparkConf().setAppName("UserClickCountStat")
    conf.setMaster("spark://Master:7077").set("spark.driver.host","192.168.94.1").set("spark.cores.max","1")
    conf.setJars(List("D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\sparklearning.jar",
      "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\spark-streaming-kafka_2.10-1.4.0.jar",
      "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\kafka_2.10-0.8.2.1.jar",
      "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\json-lib-2.4.jar",
      "D:\\BigdataResearch\\SparkLearning\\out\\artifacts\\sparklearning_jar\\ezmorph-1.0.6.jar"))

    val ssc = new StreamingContext(conf, Seconds(20))

    // Kafka configurations
    val topics = Set("user_events")
    val brokers = "Master:9092"
    val kafkaParams = Map[String, String](
    "metadata.broker.list" -> brokers)//, "serializer.class" -> "kafka.serializer.StringEncoder")
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)    

    val events = kafkaStream.flatMap(line => {
      val data = JSONObject.fromObject(line._2)
      println(line._1)
      //println(data)
      Some(data)
    })    
    ssc.start()
    ssc.awaitTermination()
  }
}
when run this application,Exception has thrown as that,can somebody give some suggestion?
15/07/19 17:27:00 INFO DAGScheduler: Job 2 failed: foreachRDD at KafkaEventAnalytics.scala:59, took 0.182201 s
15/07/19 17:27:00 ERROR JobScheduler: Error running job streaming job 1437298020000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 11, 192.168.94.137): org.apache.spark.util.TaskCompletionListenerException
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:72)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/07/19 17:27:20 INFO JobScheduler: Added jobs for time 1437298040000 ms
15/07/19 17:27:20 INFO SparkContext: Starting job: foreachRDD at KafkaEventAnalytics.scala:59
15/07/19 17:27:20 INFO JobScheduler: Starting job streaming job 1437298040000 ms.0 from job set of time 1437298040000 ms
15/07/19 17:27:20 INFO DAGScheduler: Registering RDD 14 (map at KafkaEventAnalytics.scala:58)
15/07/19 17:27:20 INFO DAGScheduler: Got job 3 (foreachRDD at KafkaEventAnalytics.scala:59) with 2 output partitions (allowLocal=false)
15/07/19 17:27:20 INFO DAGScheduler: Final stage: ResultStage 7(foreachRDD at KafkaEventAnalytics.scala:59)
15/07/19 17:27:20 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 6)
15/07/19 17:27:20 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 6)
15/07/19 17:27:20 INFO DAGScheduler: Submitting ShuffleMapStage 6 (MapPartitionsRDD[14] at map at KafkaEventAnalytics.scala:58), which has no missing parents
15/07/19 17:27:20 INFO MemoryStore: ensureFreeSpace(3696) called with curMem=17503, maxMem=1016667832
15/07/19 17:27:20 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.6 KB, free 969.5 MB)
15/07/19 17:27:20 INFO MemoryStore: ensureFreeSpace(2141) called with curMem=21199, maxMem=1016667832
15/07/19 17:27:20 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.1 KB, free 969.5 MB)
15/07/19 17:27:20 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.17.1:51847 (size: 2.1 KB, free: 969.6 MB)
15/07/19 17:27:20 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
15/07/19 17:27:20 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 6 (MapPartitionsRDD[14] at map at KafkaEventAnalytics.scala:58)
15/07/19 17:27:20 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
15/07/19 17:27:20 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 12, 192.168.94.137, ANY, 1584 bytes)
15/07/19 17:27:20 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.94.137:37251 (size: 2.1 KB, free: 267.3 MB)
15/07/19 17:27:20 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 12, 192.168.94.137): org.apache.spark.util.TaskCompletionListenerException
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:72)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)