You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ravindra Pesala (JIRA)" <ji...@apache.org> on 2014/08/05 13:59:11 UTC

[jira] [Commented] (SPARK-2408) RDD.map(func) dependencies issue after checkpoint & count

    [ https://issues.apache.org/jira/browse/SPARK-2408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14086150#comment-14086150 ] 

Ravindra Pesala commented on SPARK-2408:
----------------------------------------

Yes, Daniel is right, it is the problem of Spark shell. I went through the code of spark shell and ClosureCleaner. I don't think it is an issue of ClosureCleaner as it keeps only the outer object. But the spark shell generates the class for the last line  str_rdd.map(test).count in below manner. For each line entering into spark shell it creates one class and imports the previous attributes/names to it.

class $read extends Serializable {
  class $iwC extends Serializable {
val $VAL3 = $line3.$read.INSTANCE;
import $VAL3.$iw.$iw.`sc`;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._
class $iwC extends Serializable {
val $VAL5 = $line5.$read.INSTANCE;
import $VAL5.$iw.$iw.$iw.$iw.`r`;
val $VAL9 = $line9.$read.INSTANCE;
import $VAL9.$iw.$iw.$iw.$iw.`str_arr`;
val $VAL10 = $line10.$read.INSTANCE;
import $VAL10.$iw.$iw.$iw.$iw.`str_rdd`;
val $VAL11 = $line11.$read.INSTANCE;
import $VAL11.$iw.$iw.$iw.$iw.`test`;
class $iwC extends Serializable {
       val res0 =
              str_rdd.map(test).count
     
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;

}
object $read {
  val INSTANCE = new $read();
}

Here the outer object contains all previous imports wrapped in classes. As the statement import related to val r = new scala.util.Random() is present in one of the outer class, it try to serialize that and it fails.I think some how we should try to manage the imports of previous requests of generated code or we should handle this scenarios in ClosureCleaner. Please comment.

> RDD.map(func) dependencies issue after checkpoint & count
> ---------------------------------------------------------
>
>                 Key: SPARK-2408
>                 URL: https://issues.apache.org/jira/browse/SPARK-2408
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.9.1, 1.0.0
>            Reporter: Daniel Fry
>
> i am noticing strange behavior with a simple example use of rdd.checkpoint(). 
> you can paste the following code into any spark-shell (e.g. with MASTER=local[*]) 
> // build an array of 100 random lowercase strings of length 10
> val r = new scala.util.Random()
> val str_arr = (1 to 100).map(a => (1 to 10).map(b => new Character(((Math.abs(r.nextInt) % 26) + 97).toChar)).mkString(""))
> // make this into an rdd
> val str_rdd = sc.parallelize(str_arr)
> // checkpoint & count
> sc.setCheckpointDir("hdfs://[namenode]:54310/path/to/some/spark_checkpoint_dir")
> str_rdd.checkpoint()
> str_rdd.count
> // rdd.map some dummy function
> def test(a : String) : String = { return a }
> str_rdd.map(test).count
> this results in a surprising exception! 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.util.Random
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>         at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org