You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alonso Isidoro Roman <al...@gmail.com> on 2016/05/25 13:19:34 UTC

about an exception when receiving data from kafka topic using Direct mode of Spark Streaming

Hi, i am receiving this exception when direct spark streaming process tries
to pull data from kafka topic:

16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time 1464168630000
ms saved to file
'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000',
took 5928 bytes and 8 ms

16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID
2). 1041 bytes result sent to driver
16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0
(TID 2) in 4 ms on localhost (1/1)
16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose
tasks have all completed, from pool
16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at
KafkaRDD.scala:98) finished in 0,004 s
16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at
KafkaRDD.scala:98, took 0,008740 s
<------>
someMessages is [Lscala.Tuple2;@2641d687
(null,{"userId":"someUserId","productId":"0981531679","rating":6.0})
<------>
<---POSSIBLE SOLUTION--->
16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job
1464168630000 ms.0 from job set of time 1464168630000 ms
16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list
16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time
1464168630000 ms (execution: 0,012 s)
16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job
1464168630000 ms.0*java.lang.IllegalStateException: Adding new inputs,
transformations, and output operations after starting a context is not
supported
	at* org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
	at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
	at org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
	at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
	at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
	at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
	at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
16/05/25 11:30:30 INFO BlockManager: Removing RDD 105


This is the code that rises the exception in the spark streaming process:

try{
    messages.foreachRDD( rdd =>{
      val count = rdd.count()
      if (count > 0){
        //someMessages should be AmazonRating...
        val someMessages = rdd.take(count.toInt)
        println("<------>")
        println("someMessages is " + someMessages)
        someMessages.foreach(println)
        println("<------>")
        println("<---POSSIBLE SOLUTION--->")
        messages
        .map { case (_, jsonRating) =>
          val jsValue = Json.parse(jsonRating)
          AmazonRating.amazonRatingFormat.reads(jsValue) match {
            case JsSuccess(rating, _) => rating
            case JsError(_) => AmazonRating.empty
          }
             }
        .filter(_ != AmazonRating.empty)
        *//I think that this line provokes the runtime exception...*
*        .foreachRDD(_.foreachPartition(it =>
recommender.predictWithALS(it.toSeq)))*

        println("<---POSSIBLE SOLUTION--->")

      }
      }
    )
    }catch{
      case e: IllegalArgumentException => {println("illegal arg.
exception")};
      case e: IllegalStateException    => {println("illegal state
exception")};
      case e: ClassCastException       => {println("ClassCastException")};
      case e: Exception                => {println(" Generic Exception")};
    }finally{

      println("Finished taking data from kafka topic...")
    }

Recommender object:

*def predictWithALS(ratings: Seq[AmazonRating])* = {
    // train model
    val myRatings = ratings.map(toSparkRating)
    val myRatingRDD = sc.parallelize(myRatings)

    val startAls = DateTime.now
    val model = ALS.train((sparkRatings ++
myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)

    val myProducts = myRatings.map(_.product).toSet
    val candidates = sc.parallelize((0 until
productDict.size).filterNot(myProducts.contains))

    // get ratings of all products not in my history ordered by rating
(higher first) and only keep the first NumRecommendations
    val myUserId = userDict.getIndex(MyUsername)
    val recommendations = model.predict(candidates.map((myUserId,
_))).collect
    val endAls = DateTime.now
    val result =
recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
    val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds

    println(s"ALS Time: $alsTime seconds")
    result
  }
}

And this is the kafka producer that push the json data within the topic:

object AmazonProducerExample {
  def main(args: Array[String]): Unit = {

    val productId = args(0).toString
    val userId = args(1).toString
    val rating = args(2).toDouble
    val topicName = "amazonRatingsTopic"

    val producer = Producer[String](topicName)

    //0981531679 is Scala Puzzlers...
    //AmazonProductAndRating
    AmazonPageParser.parse(productId,userId,rating).onSuccess { case
amazonRating =>
      //Is this the correct way? the best performance? possibly not, what
about using avro or parquet?
      producer.send(Json.toJson(amazonRating).toString)
      //producer.send(amazonRating)
      println("amazon product with rating sent to kafka cluster..." +
amazonRating.toString)
      System.exit(0)
    }

  }
}


I have written a stack overflow post
<http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>,
with more details, please help, i am stuck with this issue and i don't know
how to continue.

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>