You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "kai.lu" <qi...@falkonry.com> on 2014/12/24 01:13:28 UTC

Exception after changing RDDs

Hi All,

We are getting exception after we added one RDD to another RDD.

We first declared an empty RDD "A", then received new Dstream "B" from
Kafka; for each RDD in the Dstream "B", we kept adding them to the existing
RDD "A". Error happened when we were trying to use the updated RDD "A".

Could anybody give some hints about what's going wrong here ? Thanks !

*Sample Code :*

val sparkConf = new SparkConf().setMaster("local[*]").setAppName(“newAPP")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")
   ///Declare an empty RDD for A/
    @volatile var rddA: RDD[A] = ssc.sparkContext.emptyRDD[A]

/   //Receives A and B data from Kafka/
    val kafkaMessages = KafkaUtils.createStream(ssc, "localhost:2181",
"CONSUMERGROUP", Map(“TOPIC" -> 1)).map(_._2)….
    val dstreamA: DStream[A] = kafkaMessages.map(msg =>
JSONConverter.fromJSON[A](msg)).flatMap(_.toOption)…
    val dstreamB: DStream[B] = kafkaMessages.map(msg =>
JSONConverter.fromJSON[B](msg)).flatMap(_.toOption)…
    
 /  //Updates RDD A from DStream A/
   dstreamA.foreachRDD(rdd => rddA = rddA ++ rdd ) 

 /  //Joins DStream B with existing RDD /
   val results : Dstream[(A,B)] = dstreamB.transform(rdd => {
      val id_A: RDD[(String, A)] = rddA.keyBy(_.id)
      val id_B: RDD[(String, B)] = rdd.keyBy(_.id)
      val rddA_B: RDD[(A, B)] = id_A.join(id_B).map { case (id, (a, b)) =>
          (a, b)
      }
      rddA_B
   })


>From my observation, the exception happens when we were doing the
rddA.keyBy(_.id)

*Exception Output :*

14/12/23 15:35:23 WARN storage.BlockManager: Block input-0-1419377723400
already exists on this machine; not re-adding it
14/12/23 15:35:24 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/12/23 15:35:34 WARN storage.BlockManager: Block input-0-1419377734600
already exists on this machine; not re-adding it
14/12/23 15:35:37 ERROR scheduler.DAGSchedulerActorSupervisor:
eventProcesserActor failed; shutting down SparkContext
org.apache.spark.SparkException: Attempted to use BlockRDD[5] at BlockRDD at
ReceiverInputDStream.scala:69 after its blocks have been removed!
	at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
	at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
	at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216)
	at
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:216)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:215)
	at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1297)
	at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1307)
	at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
	at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1306)
	at scala.collection.immutable.List.foreach(List.scala:318)
	at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1306)
	at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1304)
	at scala.collection.immutable.List.foreach(List.scala:318)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Exception-after-changing-RDDs-tp20841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org