You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by randylu <ra...@gmail.com> on 2014/04/23 11:45:38 UTC

about rdd.filter()

  my code is like:
    rdd2 = rdd1.filter(_._2.length > 1)
    rdd2.collect()
  it works well, but if i use a variable /num/ instead of 1:
    var num = 1
    rdd2 = rdd1.filter(_._2.length > num)
    rdd2.collect()
  it fails at rdd2.collect()
  so strange?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: about rdd.filter()

Posted by randylu <ra...@gmail.com>.
@Cheng Lian-2, Sourav Chandra,  thanks very much.
   You are right! The situation just like what you say. so nice !



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657p4718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: about rdd.filter()

Posted by randylu <ra...@gmail.com>.
14/04/23 17:17:40 INFO DAGScheduler: Failed to run collect at
SparkListDocByTopic.scala:407
Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
        at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task not serializable: java.io.NotSerializableExceptio
n: SparkListDocByTopic$EnvParameter
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler
.scala:1013)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler
.scala:1002)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler
.scala:1000)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1000)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:77
2)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scal
a:892)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:889)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:889)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:889)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:888)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:888)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:592)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:143)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657p4717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: about rdd.filter()

Posted by Lukas Nalezenec <lu...@firma.seznam.cz>.
Hi,
can you please add stacktrace ?
Lukas

On 23.4.2014 11:45, randylu wrote:
>    my code is like:
>      rdd2 = rdd1.filter(_._2.length > 1)
>      rdd2.collect()
>    it works well, but if i use a variable /num/ instead of 1:
>      var num = 1
>      rdd2 = rdd1.filter(_._2.length > num)
>      rdd2.collect()
>    it fails at rdd2.collect()
>    so strange?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: about rdd.filter()

Posted by Cheng Lian <li...@gmail.com>.
Does your job fail because of serialization error? One situation I can
think of is something like this:

class NotSerializable(val n: Int)val obj = new NotSerializable(1)
sc.makeRDD(1 to 3).filter(_ > obj.n)

If you enclose a field member of an object into a closure, not only this
field but also the whole outer object is enclosed into the closure. If the
outer object is not serializable, then RDD DAG serialization would fail.
You can simply reference the field member with a separate variable to
workaround this:

class NotSerializable(val n: Int)val obj = new NotSerializable(1)val x = obj.n
sc.makeRDD(1 to 3).filter(_ > x)



On Wed, Apr 23, 2014 at 5:45 PM, randylu <ra...@gmail.com> wrote:

>   my code is like:
>     rdd2 = rdd1.filter(_._2.length > 1)
>     rdd2.collect()
>   it works well, but if i use a variable /num/ instead of 1:
>     var num = 1
>     rdd2 = rdd1.filter(_._2.length > num)
>     rdd2.collect()
>   it fails at rdd2.collect()
>   so strange?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: about rdd.filter()

Posted by Sourav Chandra <so...@livestream.com>.
This could happen if variable is defined in such a way that it pulls its
own class reference into the closure. Hence serilization tries to
 serialize the whole outer class reference which is not serializable and
whole thing failed.



On Wed, Apr 23, 2014 at 3:15 PM, randylu <ra...@gmail.com> wrote:

>   my code is like:
>     rdd2 = rdd1.filter(_._2.length > 1)
>     rdd2.collect()
>   it works well, but if i use a variable /num/ instead of 1:
>     var num = 1
>     rdd2 = rdd1.filter(_._2.length > num)
>     rdd2.collect()
>   it fails at rdd2.collect()
>   so strange?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chandra@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com