You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yadid Ayzenberg <ya...@media.mit.edu> on 2013/11/03 19:33:06 UTC

java.io.NotSerializableException on RDD count() in Java

Hi All,

My original RDD contains arrays of doubles. when appying a count() 
operator to the original RDD I get the result as expected.
However when I run a map on the original RDD in order to generate a new 
RDD with only the first element of each array, and try to apply count() 
to the new generated RDD I get the following exception:

19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed 
to run count at AnalyticsEngine.java:133
[error] (run-main) org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)


If a run a take() operation on the new RDD I receive the results as 
expected. here is my code:


JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object, 
BSONObject>, Double>() {
         @Override
         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
           BSONObject doc = e._2();
           List<List<Double>> vals = (List<List<Double>>)doc.get("data");
           List<Double> results = new ArrayList<Double>();
           for (int i=0; i< vals.size();i++ )
               results.add((Double)vals.get(i).get(0));
           return results;

         }
         });

         logger.info("Take: {}", rdd2.take(100));
         logger.info("Count: {}", rdd2.count());


Any ideas on what I am doing wrong ?

Thanks,

Yadid



-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Re: java.io.NotSerializableException on RDD count() in Java

Posted by Patrick Wendell <pw...@gmail.com>.
No problem - thanks for helping us diagnose this!

On Tue, Nov 5, 2013 at 5:04 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> Ah, I see. Thanks very much for you assistance Patrick and Reynold.
> As a workaround for now, I implemented the SC field as transient and its
> working fine.
>
> Yadid
>
>
>
> On 11/3/13 9:05 PM, Reynold Xin wrote:
>
> Yea so every inner class actually contains a field referencing the outer
> class. In your case, the anonymous class DoubleFlatMapFunction actually has
> a this$0 field referencing the outer class AnalyticsEngine, and thus why
> Spark is trying to serialize AnalyticsEngine.
>
> In the Scala API, the closure (which is really just implemented as anonymous
> classes) has a field called "$outer", and Spark uses a "closure cleaner"
> that goes into the anonymous class to remove the $outer field if it is not
> used in the closure itself. In Java, the compiler generates a field called
> "this$0", and thus the closure cleaner doesn't find it and can't "clean" it
> properly.
>
> I will work on a fix for the closure cleaner to clean this up as well.
> Meantime, you can work around this by either defining your anonymous class
> as a static class, or mark the JavaSparkContext field as transient.
>
>
>
> On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell <pw...@gmail.com> wrote:
>>
>> Hm, I think you are triggering a bug in the Java API where closures
>> may not be properly cleaned. I think @rxin has reproduced this,
>> deferring to him.
>>
>> - Patrick
>>
>> On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> wrote:
>> > code is below. in the code rdd.count() works, but rdd2.count() fails.
>> >
>> > public class AnalyticsEngine  implements Serializable {
>> >
>> >     private static AnalyticsEngine engine=null;
>> >     private JavaSparkContext sc;
>> >
>> >     final Logger logger =
>> > LoggerFactory.getLogger(AnalyticsEngine.class);
>> >     private Properties prop;
>> >
>> >     String db_host;
>> >
>> >     private AnalyticsEngine()
>> >     {
>> >         System.setProperty("spark.serializer",
>> > "org.apache.spark.serializer.KryoSerializer");
>> >         System.setProperty("spark.kryo.registrator",
>> > "edu.mit.bsense.MyRegistrator");
>> >         sc = new JavaSparkContext("local[4]","TestSpark");
>> >         Properties prop = new Properties();
>> >         try {
>> >             prop.load(new FileInputStream("config.properties"));
>> >
>> >
>> >             db_host = prop.getProperty("database_host1");
>> >             logger.info("Database host: {}", db_host);
>> >         }  catch (FileNotFoundException ex)
>> >                 {
>> >                     logger.info("Could not read config.properties: " +
>> > ex.toString());
>> >
>> >                 } catch (IOException ex)
>> >                 {
>> >                     logger.info("Could not read config.properties: " +
>> > ex.toString());
>> >
>> >                 }
>> >
>> >
>> >
>> >         public void getData(void)
>> >         {
>> >         Configuration conf = new Configuration();
>> >
>> >         String conf_url = "mongodb://" + db_host + "/test.data1"; //this
>> > is
>> > the data partition
>> >         conf.set("mongo.input.uri", conf_url);
>> >
>> >
>> >         conf.set("mongo.input.query",
>> > "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
>> >         conf.set("mongo.input.split_size","64");
>> >
>> >         JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf,
>> > MongoInputFormat.class, Object.class, BSONObject.class);
>> >
>> >         rdd.cache();
>> >
>> >         logger.info("Count of rdd: {}", rdd.count());
>> >
>> >
>> > logger.info("==========================================================================");
>> >
>> >
>> >
>> >         JavaDoubleRDD rdd2 =  rdd.flatMap( new
>> > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
>> >
>> >         @Override
>> >         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>> >           BSONObject doc = e._2();
>> >           BasicDBList vals = (BasicDBList)doc.get("data");
>> >
>> >           List<Double> results = new ArrayList<Double>();
>> >           for (int i=0; i< vals.size();i++ )
>> > results.add((Double)((BasicDBList)vals.get(i)).get(0));
>> >
>> >           return results;
>> >
>> >         }
>> >         });
>> >
>> >         logger.info("Take: {}", rdd2.take(100));
>> >         logger.info("Count: {}", rdd2.count());
>> >
>> >
>> >     }
>> >
>> >     }
>> >
>> >
>> > On 11/3/13 8:19 PM, Patrick Wendell wrote:
>> >>
>> >> Thanks that would help. This would be consistent with there being a
>> >> reference to the SparkContext itself inside of the closure. Just want
>> >> to make sure that's not the case.
>> >>
>> >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> >> wrote:
>> >>>
>> >>> Im running in local[4] mode - so there are no slave machines. Full
>> >>> stack
>> >>> trace:
>> >>>
>> >>>
>> >>> (run-main) org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>> org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >>>      at
>> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >>> [debug]     Thread run-main exited.
>> >>> [debug] Interrupting remaining threads (should be all daemons).
>> >>> [debug] Sandboxed run complete..
>> >>> java.lang.RuntimeException: Nonzero exit code: 1
>> >>>      at scala.sys.package$.error(package.scala:27)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at scala.Option.foreach(Option.scala:236)
>> >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>> >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>> >>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>> >>>      at
>> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>> >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>> >>>      at sbt.Execute.work(Execute.scala:244)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>> >>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>>      at java.lang.Thread.run(Thread.java:695)
>> >>>
>> >>> when I add implements Serializable to my class, I get the following
>> >>> stack
>> >>> trace:
>> >>>
>> >>> error] (run-main) org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException:
>> >>> org.apache.spark.api.java.JavaSparkContext
>> >>> org.apache.spark.SparkException: Job failed:
>> >>> java.io.NotSerializableException:
>> >>> org.apache.spark.api.java.JavaSparkContext
>> >>>
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >>>      at
>> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >>>      at
>> >>>
>> >>>
>> >>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >>> [debug]     Thread run-main exited.
>> >>> [debug] Interrupting remaining threads (should be all daemons).
>> >>> [debug] Sandboxed run complete..
>> >>> java.lang.RuntimeException: Nonzero exit code: 1
>> >>>      at scala.sys.package$.error(package.scala:27)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>> >>>      at scala.Option.foreach(Option.scala:236)
>> >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>> >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>> >>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>> >>>      at
>> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>> >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>> >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>> >>>      at sbt.Execute.work(Execute.scala:244)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>> >>>      at
>> >>>
>> >>>
>> >>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>> >>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>> >>>      at
>> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>> >>>      at
>> >>>
>> >>>
>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>> >>>      at java.lang.Thread.run(Thread.java:695)
>> >>>
>> >>> I can post my code if that helps
>> >>>
>> >>>
>> >>>
>> >>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>> >>>>
>> >>>> If you look in the UI, are there failures on any of the slaves that
>> >>>> you can give a  stack trace for? That would narrow down where the
>> >>>> serialization error is happening.
>> >>>>
>> >>>> Unfortunately this code path doesn't print a full stack trace which
>> >>>> makes it harder to debug where the serialization error comes from.
>> >>>>
>> >>>> Could you post all of your code?
>> >>>>
>> >>>> Also, just wondering, what happens if you just go ahead and add
>> >>>> "extends Serializable" to AnalyticsEngine class? It's possible this
>> >>>> is
>> >>>> happening during closure serialization, which will use the closure
>> >>>> serializer (which is by default Java).
>> >>>>
>> >>>> - Patrick
>> >>>>
>> >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> >>>> wrote:
>> >>>>>
>> >>>>> yes, I tried that as well (it is currently registered with Kryo)-
>> >>>>> although
>> >>>>> it doesnt make sense to me (and doesnt solve the problem). I also
>> >>>>> made
>> >>>>> sure
>> >>>>> my registration was running:
>> >>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>> >>>>> registrator: edu.mit.bsense.MyRegistrator
>> >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>> >>>>> org.apache.spark.serializer.KryoSerializer  - Running user
>> >>>>> registrator:
>> >>>>> edu.mit.bsense.MyRegistrator
>> >>>>>
>> >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>> >>>>> instantiates the RDDs and runs the map() and count().
>> >>>>> Can you explain why it needs to be serialized?
>> >>>>>
>> >>>>> Also, when running count() on my original RDD (pre map) I get the
>> >>>>> right
>> >>>>> answer - this means the classes of data in the RDD are serializable.
>> >>>>> It's only when I run map, and then count() on a new RDD do I get
>> >>>>> this
>> >>>>> exception. My map does not introduce any new classes it - just
>> >>>>> iterates
>> >>>>> over
>> >>>>> the existing data.
>> >>>>>
>> >>>>> Any ideas?
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>> >>>>>>
>> >>>>>> edu.mit.bsense.AnalyticsEngine
>> >>>>>>
>> >>>>>> Look at the exception. Basically, you'll need to register every
>> >>>>>> class
>> >>>>>> type that is recursively used by BSONObject.
>> >>>>>>
>> >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg
>> >>>>>> <ya...@media.mit.edu>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>> Hi Patrick,
>> >>>>>>>
>> >>>>>>> I am in fact using Kryo and im registering  BSONObject.class
>> >>>>>>> (which
>> >>>>>>> is
>> >>>>>>> class
>> >>>>>>> holding the data) in my KryoRegistrator.
>> >>>>>>> Im not sure what other classes I should be registering.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>>
>> >>>>>>> Yadid
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>> >>>>>>>>
>> >>>>>>>> The problem is you are referencing a class that does not "extend
>> >>>>>>>> serializable" in the data that you shuffle. Spark needs to send
>> >>>>>>>> all
>> >>>>>>>> shuffle data over the network, so it needs to know how to
>> >>>>>>>> serialize
>> >>>>>>>> them.
>> >>>>>>>>
>> >>>>>>>> One option is to use Kryo for network serialization as described
>> >>>>>>>> here
>> >>>>>>>> - you'll have to register all the class that get serialized
>> >>>>>>>> though.
>> >>>>>>>>
>> >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>> >>>>>>>>
>> >>>>>>>> Another option is to write a wrapper class that "extends
>> >>>>>>>> externalizable" and write the serialization yourself.
>> >>>>>>>>
>> >>>>>>>> - Patrick
>> >>>>>>>>
>> >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg
>> >>>>>>>> <ya...@media.mit.edu>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi All,
>> >>>>>>>>>
>> >>>>>>>>> My original RDD contains arrays of doubles. when appying a
>> >>>>>>>>> count()
>> >>>>>>>>> operator
>> >>>>>>>>> to the original RDD I get the result as expected.
>> >>>>>>>>> However when I run a map on the original RDD in order to
>> >>>>>>>>> generate a
>> >>>>>>>>> new
>> >>>>>>>>> RDD
>> >>>>>>>>> with only the first element of each array, and try to apply
>> >>>>>>>>> count()
>> >>>>>>>>> to
>> >>>>>>>>> the
>> >>>>>>>>> new generated RDD I get the following exception:
>> >>>>>>>>>
>> >>>>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler
>> >>>>>>>>> -
>> >>>>>>>>> Failed
>> >>>>>>>>> to
>> >>>>>>>>> run count at AnalyticsEngine.java:133
>> >>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>>>>>>>> org.apache.spark.SparkException: Job failed:
>> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>> >>>>>>>>>         at
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> If a run a take() operation on the new RDD I receive the results
>> >>>>>>>>> as
>> >>>>>>>>> expected. here is my code:
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>> >>>>>>>>> FlatMapFunction<Tuple2<Object,
>> >>>>>>>>> BSONObject>, Double>() {
>> >>>>>>>>>             @Override
>> >>>>>>>>>             public Iterable<Double> call(Tuple2<Object,
>> >>>>>>>>> BSONObject>
>> >>>>>>>>> e)
>> >>>>>>>>> {
>> >>>>>>>>>               BSONObject doc = e._2();
>> >>>>>>>>>               List<List<Double>> vals =
>> >>>>>>>>> (List<List<Double>>)doc.get("data");
>> >>>>>>>>>               List<Double> results = new ArrayList<Double>();
>> >>>>>>>>>               for (int i=0; i< vals.size();i++ )
>> >>>>>>>>>                   results.add((Double)vals.get(i).get(0));
>> >>>>>>>>>               return results;
>> >>>>>>>>>
>> >>>>>>>>>             }
>> >>>>>>>>>             });
>> >>>>>>>>>
>> >>>>>>>>>             logger.info("Take: {}", rdd2.take(100));
>> >>>>>>>>>             logger.info("Count: {}", rdd2.count());
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Any ideas on what I am doing wrong ?
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>>
>> >>>>>>>>> Yadid
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> --
>> >>>>>>>>> Yadid Ayzenberg
>> >>>>>>>>> Graduate Student and Research Assistant
>> >>>>>>>>> Affective Computing
>> >>>>>>>>> Phone: 617-866-7226
>> >>>>>>>>> Room: E14-274G
>> >>>>>>>>> MIT Media Lab
>> >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>> --
>> >>>>>>> Yadid Ayzenberg
>> >>>>>>> Graduate Student and Research Assistant
>> >>>>>>> Affective Computing
>> >>>>>>> Phone: 617-866-7226
>> >>>>>>> Room: E14-274G
>> >>>>>>> MIT Media Lab
>> >>>>>>> 75 Amherst st, Cambridge, MA, 02139
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>> --
>> >>>>> Yadid Ayzenberg
>> >>>>> Graduate Student and Research Assistant
>> >>>>> Affective Computing
>> >>>>> Phone: 617-866-7226
>> >>>>> Room: E14-274G
>> >>>>> MIT Media Lab
>> >>>>> 75 Amherst st, Cambridge, MA, 02139
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >>> --
>> >>> Yadid Ayzenberg
>> >>> Graduate Student and Research Assistant
>> >>> Affective Computing
>> >>> Phone: 617-866-7226
>> >>> Room: E14-274G
>> >>> MIT Media Lab
>> >>> 75 Amherst st, Cambridge, MA, 02139
>> >>>
>> >>>
>> >>>
>> >
>> >
>> > --
>> > Yadid Ayzenberg
>> > Graduate Student and Research Assistant
>> > Affective Computing
>> > Phone: 617-866-7226
>> > Room: E14-274G
>> > MIT Media Lab
>> > 75 Amherst st, Cambridge, MA, 02139
>> >
>> >
>> >
>
>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Re: java.io.NotSerializableException on RDD count() in Java

Posted by Yadid Ayzenberg <ya...@media.mit.edu>.
Ah, I see. Thanks very much for you assistance Patrick and Reynold.
As a workaround for now, I implemented the SC field as transient and its 
working fine.

Yadid


On 11/3/13 9:05 PM, Reynold Xin wrote:
> Yea so every inner class actually contains a field referencing the 
> outer class. In your case, the anonymous class DoubleFlatMapFunction 
> actually has a this$0 field referencing the outer 
> class AnalyticsEngine, and thus why Spark is trying to 
> serialize AnalyticsEngine.
>
> In the Scala API, the closure (which is really just implemented as 
> anonymous classes) has a field called "$outer", and Spark uses a 
> "closure cleaner" that goes into the anonymous class to remove the 
> $outer field if it is not used in the closure itself. In Java, the 
> compiler generates a field called "this$0", and thus the closure 
> cleaner doesn't find it and can't "clean" it properly.
>
> I will work on a fix for the closure cleaner to clean this up as well. 
> Meantime, you can work around this by either defining your anonymous 
> class as a static class, or mark the JavaSparkContext field as transient.
>
>
>
> On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell <pwendell@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hm, I think you are triggering a bug in the Java API where closures
>     may not be properly cleaned. I think @rxin has reproduced this,
>     deferring to him.
>
>     - Patrick
>
>     On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <ma...@media.mit.edu>> wrote:
>     > code is below. in the code rdd.count() works, but rdd2.count()
>     fails.
>     >
>     > public class AnalyticsEngine  implements Serializable {
>     >
>     >     private static AnalyticsEngine engine=null;
>     >     private JavaSparkContext sc;
>     >
>     >     final Logger logger =
>     LoggerFactory.getLogger(AnalyticsEngine.class);
>     >     private Properties prop;
>     >
>     >     String db_host;
>     >
>     >     private AnalyticsEngine()
>     >     {
>     >         System.setProperty("spark.serializer",
>     > "org.apache.spark.serializer.KryoSerializer");
>     > System.setProperty("spark.kryo.registrator",
>     > "edu.mit.bsense.MyRegistrator");
>     >         sc = new JavaSparkContext("local[4]","TestSpark");
>     >         Properties prop = new Properties();
>     >         try {
>     >             prop.load(new FileInputStream("config.properties"));
>     >
>     >
>     >             db_host = prop.getProperty("database_host1");
>     > logger.info <http://logger.info>("Database host: {}", db_host);
>     >         }  catch (FileNotFoundException ex)
>     >                 {
>     > logger.info <http://logger.info>("Could not read
>     config.properties: " +
>     > ex.toString());
>     >
>     >                 } catch (IOException ex)
>     >                 {
>     > logger.info <http://logger.info>("Could not read
>     config.properties: " +
>     > ex.toString());
>     >
>     >                 }
>     >
>     >
>     >
>     >         public void getData(void)
>     >         {
>     >         Configuration conf = new Configuration();
>     >
>     >         String conf_url = "mongodb://" + db_host +
>     "/test.data1"; //this is
>     > the data partition
>     >         conf.set("mongo.input.uri", conf_url);
>     >
>     >
>     >         conf.set("mongo.input.query",
>     > "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
>     >         conf.set("mongo.input.split_size","64");
>     >
>     >         JavaPairRDD<Object,BSONObject> rdd =
>     sc.newAPIHadoopRDD(conf,
>     > MongoInputFormat.class, Object.class, BSONObject.class);
>     >
>     >         rdd.cache();
>     >
>     > logger.info <http://logger.info>("Count of rdd: {}", rdd.count());
>     >
>     > logger.info
>     <http://logger.info>("==========================================================================");
>     >
>     >
>     >
>     >         JavaDoubleRDD rdd2 =  rdd.flatMap( new
>     > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
>     >
>     >         @Override
>     >         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>     >           BSONObject doc = e._2();
>     >           BasicDBList vals = (BasicDBList)doc.get("data");
>     >
>     >           List<Double> results = new ArrayList<Double>();
>     >           for (int i=0; i< vals.size();i++ )
>     > results.add((Double)((BasicDBList)vals.get(i)).get(0));
>     >
>     >           return results;
>     >
>     >         }
>     >         });
>     >
>     > logger.info <http://logger.info>("Take: {}", rdd2.take(100));
>     > logger.info <http://logger.info>("Count: {}", rdd2.count());
>     >
>     >
>     >     }
>     >
>     >     }
>     >
>     >
>     > On 11/3/13 8:19 PM, Patrick Wendell wrote:
>     >>
>     >> Thanks that would help. This would be consistent with there being a
>     >> reference to the SparkContext itself inside of the closure.
>     Just want
>     >> to make sure that's not the case.
>     >>
>     >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <ma...@media.mit.edu>>
>     >> wrote:
>     >>>
>     >>> Im running in local[4] mode - so there are no slave machines.
>     Full stack
>     >>> trace:
>     >>>
>     >>>
>     >>> (run-main) org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>     >>> org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>>
>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     >>>      at
>     >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>     >>> [debug]     Thread run-main exited.
>     >>> [debug] Interrupting remaining threads (should be all daemons).
>     >>> [debug] Sandboxed run complete..
>     >>> java.lang.RuntimeException: Nonzero exit code: 1
>     >>>      at scala.sys.package$.error(package.scala:27)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at scala.Option.foreach(Option.scala:236)
>     >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>     >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>     >>>      at
>     scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>     >>>      at
>     >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>     >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>     >>>      at sbt.Execute.work(Execute.scala:244)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     >>>
>     sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>     >>>      at
>     sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     >>>      at java.lang.Thread.run(Thread.java:695)
>     >>>
>     >>> when I add implements Serializable to my class, I get the
>     following stack
>     >>> trace:
>     >>>
>     >>> error] (run-main) org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException:
>     >>> org.apache.spark.api.java.JavaSparkContext
>     >>> org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException:
>     >>> org.apache.spark.api.java.JavaSparkContext
>     >>>
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>>
>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     >>>      at
>     >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>     >>> [debug]     Thread run-main exited.
>     >>> [debug] Interrupting remaining threads (should be all daemons).
>     >>> [debug] Sandboxed run complete..
>     >>> java.lang.RuntimeException: Nonzero exit code: 1
>     >>>      at scala.sys.package$.error(package.scala:27)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at scala.Option.foreach(Option.scala:236)
>     >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>     >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>     >>>      at
>     scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>     >>>      at
>     >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>     >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>     >>>      at sbt.Execute.work(Execute.scala:244)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     >>>
>     sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>     >>>      at
>     sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     >>>      at java.lang.Thread.run(Thread.java:695)
>     >>>
>     >>> I can post my code if that helps
>     >>>
>     >>>
>     >>>
>     >>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>     >>>>
>     >>>> If you look in the UI, are there failures on any of the
>     slaves that
>     >>>> you can give a  stack trace for? That would narrow down where the
>     >>>> serialization error is happening.
>     >>>>
>     >>>> Unfortunately this code path doesn't print a full stack trace
>     which
>     >>>> makes it harder to debug where the serialization error comes
>     from.
>     >>>>
>     >>>> Could you post all of your code?
>     >>>>
>     >>>> Also, just wondering, what happens if you just go ahead and add
>     >>>> "extends Serializable" to AnalyticsEngine class? It's
>     possible this is
>     >>>> happening during closure serialization, which will use the
>     closure
>     >>>> serializer (which is by default Java).
>     >>>>
>     >>>> - Patrick
>     >>>>
>     >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <ma...@media.mit.edu>>
>     >>>> wrote:
>     >>>>>
>     >>>>> yes, I tried that as well (it is currently registered with
>     Kryo)-
>     >>>>> although
>     >>>>> it doesnt make sense to me (and doesnt solve the problem). I
>     also made
>     >>>>> sure
>     >>>>> my registration was running:
>     >>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>     >>>>> registrator: edu.mit.bsense.MyRegistrator
>     >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>     >>>>> org.apache.spark.serializer.KryoSerializer  - Running user
>     registrator:
>     >>>>> edu.mit.bsense.MyRegistrator
>     >>>>>
>     >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the
>     SC which
>     >>>>> instantiates the RDDs and runs the map() and count().
>     >>>>> Can you explain why it needs to be serialized?
>     >>>>>
>     >>>>> Also, when running count() on my original RDD (pre map) I
>     get the right
>     >>>>> answer - this means the classes of data in the RDD are
>     serializable.
>     >>>>> It's only when I run map, and then count() on a new RDD do I
>     get this
>     >>>>> exception. My map does not introduce any new classes it -
>     just iterates
>     >>>>> over
>     >>>>> the existing data.
>     >>>>>
>     >>>>> Any ideas?
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>     >>>>>>
>     >>>>>> edu.mit.bsense.AnalyticsEngine
>     >>>>>>
>     >>>>>> Look at the exception. Basically, you'll need to register
>     every class
>     >>>>>> type that is recursively used by BSONObject.
>     >>>>>>
>     >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <ma...@media.mit.edu>>
>     >>>>>> wrote:
>     >>>>>>>
>     >>>>>>> Hi Patrick,
>     >>>>>>>
>     >>>>>>> I am in fact using Kryo and im registering
>      BSONObject.class (which
>     >>>>>>> is
>     >>>>>>> class
>     >>>>>>> holding the data) in my KryoRegistrator.
>     >>>>>>> Im not sure what other classes I should be registering.
>     >>>>>>>
>     >>>>>>> Thanks,
>     >>>>>>>
>     >>>>>>> Yadid
>     >>>>>>>
>     >>>>>>>
>     >>>>>>>
>     >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>     >>>>>>>>
>     >>>>>>>> The problem is you are referencing a class that does not
>     "extend
>     >>>>>>>> serializable" in the data that you shuffle. Spark needs
>     to send all
>     >>>>>>>> shuffle data over the network, so it needs to know how to
>     serialize
>     >>>>>>>> them.
>     >>>>>>>>
>     >>>>>>>> One option is to use Kryo for network serialization as
>     described
>     >>>>>>>> here
>     >>>>>>>> - you'll have to register all the class that get
>     serialized though.
>     >>>>>>>>
>     >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>     >>>>>>>>
>     >>>>>>>> Another option is to write a wrapper class that "extends
>     >>>>>>>> externalizable" and write the serialization yourself.
>     >>>>>>>>
>     >>>>>>>> - Patrick
>     >>>>>>>>
>     >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg
>     >>>>>>>> <yadid@media.mit.edu <ma...@media.mit.edu>>
>     >>>>>>>> wrote:
>     >>>>>>>>>
>     >>>>>>>>> Hi All,
>     >>>>>>>>>
>     >>>>>>>>> My original RDD contains arrays of doubles. when appying
>     a count()
>     >>>>>>>>> operator
>     >>>>>>>>> to the original RDD I get the result as expected.
>     >>>>>>>>> However when I run a map on the original RDD in order to
>     generate a
>     >>>>>>>>> new
>     >>>>>>>>> RDD
>     >>>>>>>>> with only the first element of each array, and try to
>     apply count()
>     >>>>>>>>> to
>     >>>>>>>>> the
>     >>>>>>>>> new generated RDD I get the following exception:
>     >>>>>>>>>
>     >>>>>>>>> 19829 [run-main] INFO
>      org.apache.spark.scheduler.DAGScheduler  -
>     >>>>>>>>> Failed
>     >>>>>>>>> to
>     >>>>>>>>> run count at AnalyticsEngine.java:133
>     >>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job
>     failed:
>     >>>>>>>>> java.io.NotSerializableException:
>     edu.mit.bsense.AnalyticsEngine
>     >>>>>>>>> org.apache.spark.SparkException: Job failed:
>     >>>>>>>>> java.io.NotSerializableException:
>     edu.mit.bsense.AnalyticsEngine
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> If a run a take() operation on the new RDD I receive the
>     results as
>     >>>>>>>>> expected. here is my code:
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>     >>>>>>>>> FlatMapFunction<Tuple2<Object,
>     >>>>>>>>> BSONObject>, Double>() {
>     >>>>>>>>> @Override
>     >>>>>>>>>             public Iterable<Double> call(Tuple2<Object,
>     BSONObject>
>     >>>>>>>>> e)
>     >>>>>>>>> {
>     >>>>>>>>> BSONObject doc = e._2();
>     >>>>>>>>> List<List<Double>> vals =
>     >>>>>>>>> (List<List<Double>>)doc.get("data");
>     >>>>>>>>> List<Double> results = new ArrayList<Double>();
>     >>>>>>>>>               for (int i=0; i< vals.size();i++ )
>     >>>>>>>>> results.add((Double)vals.get(i).get(0));
>     >>>>>>>>> return results;
>     >>>>>>>>>
>     >>>>>>>>>             }
>     >>>>>>>>>             });
>     >>>>>>>>>
>     >>>>>>>>> logger.info <http://logger.info>("Take: {}",
>     rdd2.take(100));
>     >>>>>>>>> logger.info <http://logger.info>("Count: {}", rdd2.count());
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> Any ideas on what I am doing wrong ?
>     >>>>>>>>>
>     >>>>>>>>> Thanks,
>     >>>>>>>>>
>     >>>>>>>>> Yadid
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> --
>     >>>>>>>>> Yadid Ayzenberg
>     >>>>>>>>> Graduate Student and Research Assistant
>     >>>>>>>>> Affective Computing
>     >>>>>>>>> Phone: 617-866-7226 <tel:617-866-7226>
>     >>>>>>>>> Room: E14-274G
>     >>>>>>>>> MIT Media Lab
>     >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>> --
>     >>>>>>> Yadid Ayzenberg
>     >>>>>>> Graduate Student and Research Assistant
>     >>>>>>> Affective Computing
>     >>>>>>> Phone: 617-866-7226 <tel:617-866-7226>
>     >>>>>>> Room: E14-274G
>     >>>>>>> MIT Media Lab
>     >>>>>>> 75 Amherst st, Cambridge, MA, 02139
>     >>>>>>>
>     >>>>>>>
>     >>>>>>>
>     >>>>> --
>     >>>>> Yadid Ayzenberg
>     >>>>> Graduate Student and Research Assistant
>     >>>>> Affective Computing
>     >>>>> Phone: 617-866-7226 <tel:617-866-7226>
>     >>>>> Room: E14-274G
>     >>>>> MIT Media Lab
>     >>>>> 75 Amherst st, Cambridge, MA, 02139
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>
>     >>> --
>     >>> Yadid Ayzenberg
>     >>> Graduate Student and Research Assistant
>     >>> Affective Computing
>     >>> Phone: 617-866-7226
>     >>> Room: E14-274G
>     >>> MIT Media Lab
>     >>> 75 Amherst st, Cambridge, MA, 02139
>     >>>
>     >>>
>     >>>
>     >
>     >
>     > --
>     > Yadid Ayzenberg
>     > Graduate Student and Research Assistant
>     > Affective Computing
>     > Phone: 617-866-7226
>     > Room: E14-274G
>     > MIT Media Lab
>     > 75 Amherst st, Cambridge, MA, 02139
>     >
>     >
>     >
>
>


-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Re: java.io.NotSerializableException on RDD count() in Java

Posted by Reynold Xin <rx...@apache.org>.
Yea so every inner class actually contains a field referencing the outer
class. In your case, the anonymous class DoubleFlatMapFunction actually has
a this$0 field referencing the outer class AnalyticsEngine, and thus why
Spark is trying to serialize AnalyticsEngine.

In the Scala API, the closure (which is really just implemented as
anonymous classes) has a field called "$outer", and Spark uses a "closure
cleaner" that goes into the anonymous class to remove the $outer field if
it is not used in the closure itself. In Java, the compiler generates a
field called "this$0", and thus the closure cleaner doesn't find it and
can't "clean" it properly.

I will work on a fix for the closure cleaner to clean this up as well.
Meantime, you can work around this by either defining your anonymous class
as a static class, or mark the JavaSparkContext field as transient.



On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell <pw...@gmail.com> wrote:

> Hm, I think you are triggering a bug in the Java API where closures
> may not be properly cleaned. I think @rxin has reproduced this,
> deferring to him.
>
> - Patrick
>
> On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <ya...@media.mit.edu>
> wrote:
> > code is below. in the code rdd.count() works, but rdd2.count() fails.
> >
> > public class AnalyticsEngine  implements Serializable {
> >
> >     private static AnalyticsEngine engine=null;
> >     private JavaSparkContext sc;
> >
> >     final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class);
> >     private Properties prop;
> >
> >     String db_host;
> >
> >     private AnalyticsEngine()
> >     {
> >         System.setProperty("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer");
> >         System.setProperty("spark.kryo.registrator",
> > "edu.mit.bsense.MyRegistrator");
> >         sc = new JavaSparkContext("local[4]","TestSpark");
> >         Properties prop = new Properties();
> >         try {
> >             prop.load(new FileInputStream("config.properties"));
> >
> >
> >             db_host = prop.getProperty("database_host1");
> >             logger.info("Database host: {}", db_host);
> >         }  catch (FileNotFoundException ex)
> >                 {
> >                     logger.info("Could not read config.properties: " +
> > ex.toString());
> >
> >                 } catch (IOException ex)
> >                 {
> >                     logger.info("Could not read config.properties: " +
> > ex.toString());
> >
> >                 }
> >
> >
> >
> >         public void getData(void)
> >         {
> >         Configuration conf = new Configuration();
> >
> >         String conf_url = "mongodb://" + db_host + "/test.data1"; //this
> is
> > the data partition
> >         conf.set("mongo.input.uri", conf_url);
> >
> >
> >         conf.set("mongo.input.query",
> > "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
> >         conf.set("mongo.input.split_size","64");
> >
> >         JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf,
> > MongoInputFormat.class, Object.class, BSONObject.class);
> >
> >         rdd.cache();
> >
> >         logger.info("Count of rdd: {}", rdd.count());
> >
> > logger.info
> ("==========================================================================");
> >
> >
> >
> >         JavaDoubleRDD rdd2 =  rdd.flatMap( new
> > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
> >
> >         @Override
> >         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
> >           BSONObject doc = e._2();
> >           BasicDBList vals = (BasicDBList)doc.get("data");
> >
> >           List<Double> results = new ArrayList<Double>();
> >           for (int i=0; i< vals.size();i++ )
> > results.add((Double)((BasicDBList)vals.get(i)).get(0));
> >
> >           return results;
> >
> >         }
> >         });
> >
> >         logger.info("Take: {}", rdd2.take(100));
> >         logger.info("Count: {}", rdd2.count());
> >
> >
> >     }
> >
> >     }
> >
> >
> > On 11/3/13 8:19 PM, Patrick Wendell wrote:
> >>
> >> Thanks that would help. This would be consistent with there being a
> >> reference to the SparkContext itself inside of the closure. Just want
> >> to make sure that's not the case.
> >>
> >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu>
> >> wrote:
> >>>
> >>> Im running in local[4] mode - so there are no slave machines. Full
> stack
> >>> trace:
> >>>
> >>>
> >>> (run-main) org.apache.spark.SparkException: Job failed:
> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
> >>> org.apache.spark.SparkException: Job failed:
> >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> >>>      at
> >>>
> >>>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >>>      at
> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> >>>      at
> >>>
> >>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
> >>>      at
> >>>
> >>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
> >>>      at
> >>>
> >>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> >>> [debug]     Thread run-main exited.
> >>> [debug] Interrupting remaining threads (should be all daemons).
> >>> [debug] Sandboxed run complete..
> >>> java.lang.RuntimeException: Nonzero exit code: 1
> >>>      at scala.sys.package$.error(package.scala:27)
> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
> >>>      at scala.Option.foreach(Option.scala:236)
> >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
> >>>      at sbt.Defaults$.toError(Defaults.scala:34)
> >>>      at
> >>>
> >>>
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
> >>>      at
> >>>
> >>>
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
> >>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
> >>>      at
> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
> >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
> >>>      at
> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
> >>>      at
> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
> >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
> >>>      at sbt.Execute.work(Execute.scala:244)
> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
> >>>      at
> >>>
> >>>
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
> >>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
> >>>      at
> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>      at
> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> >>>      at
> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>      at
> >>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> >>>      at
> >>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> >>>      at java.lang.Thread.run(Thread.java:695)
> >>>
> >>> when I add implements Serializable to my class, I get the following
> stack
> >>> trace:
> >>>
> >>> error] (run-main) org.apache.spark.SparkException: Job failed:
> >>> java.io.NotSerializableException:
> >>> org.apache.spark.api.java.JavaSparkContext
> >>> org.apache.spark.SparkException: Job failed:
> >>> java.io.NotSerializableException:
> >>> org.apache.spark.api.java.JavaSparkContext
> >>>
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> >>>      at
> >>>
> >>>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >>>      at
> >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> >>>      at
> >>>
> >>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
> >>>      at
> >>>
> >>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
> >>>      at
> >>>
> >>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> >>>      at
> >>>
> >>>
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> >>> [debug]     Thread run-main exited.
> >>> [debug] Interrupting remaining threads (should be all daemons).
> >>> [debug] Sandboxed run complete..
> >>> java.lang.RuntimeException: Nonzero exit code: 1
> >>>      at scala.sys.package$.error(package.scala:27)
> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
> >>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
> >>>      at scala.Option.foreach(Option.scala:236)
> >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
> >>>      at sbt.Defaults$.toError(Defaults.scala:34)
> >>>      at
> >>>
> >>>
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
> >>>      at
> >>>
> >>>
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
> >>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
> >>>      at
> >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
> >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
> >>>      at
> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
> >>>      at
> >>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
> >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
> >>>      at sbt.Execute.work(Execute.scala:244)
> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
> >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
> >>>      at
> >>>
> >>>
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
> >>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
> >>>      at
> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>      at
> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> >>>      at
> >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>      at
> >>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> >>>      at
> >>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> >>>      at java.lang.Thread.run(Thread.java:695)
> >>>
> >>> I can post my code if that helps
> >>>
> >>>
> >>>
> >>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
> >>>>
> >>>> If you look in the UI, are there failures on any of the slaves that
> >>>> you can give a  stack trace for? That would narrow down where the
> >>>> serialization error is happening.
> >>>>
> >>>> Unfortunately this code path doesn't print a full stack trace which
> >>>> makes it harder to debug where the serialization error comes from.
> >>>>
> >>>> Could you post all of your code?
> >>>>
> >>>> Also, just wondering, what happens if you just go ahead and add
> >>>> "extends Serializable" to AnalyticsEngine class? It's possible this is
> >>>> happening during closure serialization, which will use the closure
> >>>> serializer (which is by default Java).
> >>>>
> >>>> - Patrick
> >>>>
> >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu>
> >>>> wrote:
> >>>>>
> >>>>> yes, I tried that as well (it is currently registered with Kryo)-
> >>>>> although
> >>>>> it doesnt make sense to me (and doesnt solve the problem). I also
> made
> >>>>> sure
> >>>>> my registration was running:
> >>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
> >>>>> registrator: edu.mit.bsense.MyRegistrator
> >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
> >>>>> org.apache.spark.serializer.KryoSerializer  - Running user
> registrator:
> >>>>> edu.mit.bsense.MyRegistrator
> >>>>>
> >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
> >>>>> instantiates the RDDs and runs the map() and count().
> >>>>> Can you explain why it needs to be serialized?
> >>>>>
> >>>>> Also, when running count() on my original RDD (pre map) I get the
> right
> >>>>> answer - this means the classes of data in the RDD are serializable.
> >>>>> It's only when I run map, and then count() on a new RDD do I get this
> >>>>> exception. My map does not introduce any new classes it - just
> iterates
> >>>>> over
> >>>>> the existing data.
> >>>>>
> >>>>> Any ideas?
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
> >>>>>>
> >>>>>> edu.mit.bsense.AnalyticsEngine
> >>>>>>
> >>>>>> Look at the exception. Basically, you'll need to register every
> class
> >>>>>> type that is recursively used by BSONObject.
> >>>>>>
> >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <
> yadid@media.mit.edu>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi Patrick,
> >>>>>>>
> >>>>>>> I am in fact using Kryo and im registering  BSONObject.class (which
> >>>>>>> is
> >>>>>>> class
> >>>>>>> holding the data) in my KryoRegistrator.
> >>>>>>> Im not sure what other classes I should be registering.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Yadid
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
> >>>>>>>>
> >>>>>>>> The problem is you are referencing a class that does not "extend
> >>>>>>>> serializable" in the data that you shuffle. Spark needs to send
> all
> >>>>>>>> shuffle data over the network, so it needs to know how to
> serialize
> >>>>>>>> them.
> >>>>>>>>
> >>>>>>>> One option is to use Kryo for network serialization as described
> >>>>>>>> here
> >>>>>>>> - you'll have to register all the class that get serialized
> though.
> >>>>>>>>
> >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
> >>>>>>>>
> >>>>>>>> Another option is to write a wrapper class that "extends
> >>>>>>>> externalizable" and write the serialization yourself.
> >>>>>>>>
> >>>>>>>> - Patrick
> >>>>>>>>
> >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg
> >>>>>>>> <ya...@media.mit.edu>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi All,
> >>>>>>>>>
> >>>>>>>>> My original RDD contains arrays of doubles. when appying a
> count()
> >>>>>>>>> operator
> >>>>>>>>> to the original RDD I get the result as expected.
> >>>>>>>>> However when I run a map on the original RDD in order to
> generate a
> >>>>>>>>> new
> >>>>>>>>> RDD
> >>>>>>>>> with only the first element of each array, and try to apply
> count()
> >>>>>>>>> to
> >>>>>>>>> the
> >>>>>>>>> new generated RDD I get the following exception:
> >>>>>>>>>
> >>>>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
> >>>>>>>>> Failed
> >>>>>>>>> to
> >>>>>>>>> run count at AnalyticsEngine.java:133
> >>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
> >>>>>>>>> org.apache.spark.SparkException: Job failed:
> >>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
> >>>>>>>>>         at
> >>>>>>>>>
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
> >>>>>>>>>         at
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> If a run a take() operation on the new RDD I receive the results
> as
> >>>>>>>>> expected. here is my code:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
> >>>>>>>>> FlatMapFunction<Tuple2<Object,
> >>>>>>>>> BSONObject>, Double>() {
> >>>>>>>>>             @Override
> >>>>>>>>>             public Iterable<Double> call(Tuple2<Object,
> BSONObject>
> >>>>>>>>> e)
> >>>>>>>>> {
> >>>>>>>>>               BSONObject doc = e._2();
> >>>>>>>>>               List<List<Double>> vals =
> >>>>>>>>> (List<List<Double>>)doc.get("data");
> >>>>>>>>>               List<Double> results = new ArrayList<Double>();
> >>>>>>>>>               for (int i=0; i< vals.size();i++ )
> >>>>>>>>>                   results.add((Double)vals.get(i).get(0));
> >>>>>>>>>               return results;
> >>>>>>>>>
> >>>>>>>>>             }
> >>>>>>>>>             });
> >>>>>>>>>
> >>>>>>>>>             logger.info("Take: {}", rdd2.take(100));
> >>>>>>>>>             logger.info("Count: {}", rdd2.count());
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Any ideas on what I am doing wrong ?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Yadid
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Yadid Ayzenberg
> >>>>>>>>> Graduate Student and Research Assistant
> >>>>>>>>> Affective Computing
> >>>>>>>>> Phone: 617-866-7226
> >>>>>>>>> Room: E14-274G
> >>>>>>>>> MIT Media Lab
> >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>> --
> >>>>>>> Yadid Ayzenberg
> >>>>>>> Graduate Student and Research Assistant
> >>>>>>> Affective Computing
> >>>>>>> Phone: 617-866-7226
> >>>>>>> Room: E14-274G
> >>>>>>> MIT Media Lab
> >>>>>>> 75 Amherst st, Cambridge, MA, 02139
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>> --
> >>>>> Yadid Ayzenberg
> >>>>> Graduate Student and Research Assistant
> >>>>> Affective Computing
> >>>>> Phone: 617-866-7226
> >>>>> Room: E14-274G
> >>>>> MIT Media Lab
> >>>>> 75 Amherst st, Cambridge, MA, 02139
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>> --
> >>> Yadid Ayzenberg
> >>> Graduate Student and Research Assistant
> >>> Affective Computing
> >>> Phone: 617-866-7226
> >>> Room: E14-274G
> >>> MIT Media Lab
> >>> 75 Amherst st, Cambridge, MA, 02139
> >>>
> >>>
> >>>
> >
> >
> > --
> > Yadid Ayzenberg
> > Graduate Student and Research Assistant
> > Affective Computing
> > Phone: 617-866-7226
> > Room: E14-274G
> > MIT Media Lab
> > 75 Amherst st, Cambridge, MA, 02139
> >
> >
> >
>

Re: java.io.NotSerializableException on RDD count() in Java

Posted by Patrick Wendell <pw...@gmail.com>.
Hm, I think you are triggering a bug in the Java API where closures
may not be properly cleaned. I think @rxin has reproduced this,
deferring to him.

- Patrick

On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> code is below. in the code rdd.count() works, but rdd2.count() fails.
>
> public class AnalyticsEngine  implements Serializable {
>
>     private static AnalyticsEngine engine=null;
>     private JavaSparkContext sc;
>
>     final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class);
>     private Properties prop;
>
>     String db_host;
>
>     private AnalyticsEngine()
>     {
>         System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
>         System.setProperty("spark.kryo.registrator",
> "edu.mit.bsense.MyRegistrator");
>         sc = new JavaSparkContext("local[4]","TestSpark");
>         Properties prop = new Properties();
>         try {
>             prop.load(new FileInputStream("config.properties"));
>
>
>             db_host = prop.getProperty("database_host1");
>             logger.info("Database host: {}", db_host);
>         }  catch (FileNotFoundException ex)
>                 {
>                     logger.info("Could not read config.properties: " +
> ex.toString());
>
>                 } catch (IOException ex)
>                 {
>                     logger.info("Could not read config.properties: " +
> ex.toString());
>
>                 }
>
>
>
>         public void getData(void)
>         {
>         Configuration conf = new Configuration();
>
>         String conf_url = "mongodb://" + db_host + "/test.data1"; //this is
> the data partition
>         conf.set("mongo.input.uri", conf_url);
>
>
>         conf.set("mongo.input.query",
> "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
>         conf.set("mongo.input.split_size","64");
>
>         JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf,
> MongoInputFormat.class, Object.class, BSONObject.class);
>
>         rdd.cache();
>
>         logger.info("Count of rdd: {}", rdd.count());
>
> logger.info("==========================================================================");
>
>
>
>         JavaDoubleRDD rdd2 =  rdd.flatMap( new
> DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
>
>         @Override
>         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>           BSONObject doc = e._2();
>           BasicDBList vals = (BasicDBList)doc.get("data");
>
>           List<Double> results = new ArrayList<Double>();
>           for (int i=0; i< vals.size();i++ )
> results.add((Double)((BasicDBList)vals.get(i)).get(0));
>
>           return results;
>
>         }
>         });
>
>         logger.info("Take: {}", rdd2.take(100));
>         logger.info("Count: {}", rdd2.count());
>
>
>     }
>
>     }
>
>
> On 11/3/13 8:19 PM, Patrick Wendell wrote:
>>
>> Thanks that would help. This would be consistent with there being a
>> reference to the SparkContext itself inside of the closure. Just want
>> to make sure that's not the case.
>>
>> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> wrote:
>>>
>>> Im running in local[4] mode - so there are no slave machines. Full stack
>>> trace:
>>>
>>>
>>> (run-main) org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>> org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>      at
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>      at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>> [debug]     Thread run-main exited.
>>> [debug] Interrupting remaining threads (should be all daemons).
>>> [debug] Sandboxed run complete..
>>> java.lang.RuntimeException: Nonzero exit code: 1
>>>      at scala.sys.package$.error(package.scala:27)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at scala.Option.foreach(Option.scala:236)
>>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>>>      at sbt.Defaults$.toError(Defaults.scala:34)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>>>      at
>>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>>>      at sbt.Execute.work(Execute.scala:244)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at
>>>
>>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>>      at java.lang.Thread.run(Thread.java:695)
>>>
>>> when I add implements Serializable to my class, I get the following stack
>>> trace:
>>>
>>> error] (run-main) org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException:
>>> org.apache.spark.api.java.JavaSparkContext
>>> org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException:
>>> org.apache.spark.api.java.JavaSparkContext
>>>
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>      at
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>      at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>> [debug]     Thread run-main exited.
>>> [debug] Interrupting remaining threads (should be all daemons).
>>> [debug] Sandboxed run complete..
>>> java.lang.RuntimeException: Nonzero exit code: 1
>>>      at scala.sys.package$.error(package.scala:27)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>>      at scala.Option.foreach(Option.scala:236)
>>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>>>      at sbt.Defaults$.toError(Defaults.scala:34)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>>>      at
>>>
>>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>>>      at
>>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at
>>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>>>      at sbt.Execute.work(Execute.scala:244)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>>      at
>>>
>>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>>>      at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>>      at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>>      at java.lang.Thread.run(Thread.java:695)
>>>
>>> I can post my code if that helps
>>>
>>>
>>>
>>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>>>>
>>>> If you look in the UI, are there failures on any of the slaves that
>>>> you can give a  stack trace for? That would narrow down where the
>>>> serialization error is happening.
>>>>
>>>> Unfortunately this code path doesn't print a full stack trace which
>>>> makes it harder to debug where the serialization error comes from.
>>>>
>>>> Could you post all of your code?
>>>>
>>>> Also, just wondering, what happens if you just go ahead and add
>>>> "extends Serializable" to AnalyticsEngine class? It's possible this is
>>>> happening during closure serialization, which will use the closure
>>>> serializer (which is by default Java).
>>>>
>>>> - Patrick
>>>>
>>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>> wrote:
>>>>>
>>>>> yes, I tried that as well (it is currently registered with Kryo)-
>>>>> although
>>>>> it doesnt make sense to me (and doesnt solve the problem). I also made
>>>>> sure
>>>>> my registration was running:
>>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>>>>> registrator: edu.mit.bsense.MyRegistrator
>>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>>>>> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
>>>>> edu.mit.bsense.MyRegistrator
>>>>>
>>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>>>>> instantiates the RDDs and runs the map() and count().
>>>>> Can you explain why it needs to be serialized?
>>>>>
>>>>> Also, when running count() on my original RDD (pre map) I get the right
>>>>> answer - this means the classes of data in the RDD are serializable.
>>>>> It's only when I run map, and then count() on a new RDD do I get this
>>>>> exception. My map does not introduce any new classes it - just iterates
>>>>> over
>>>>> the existing data.
>>>>>
>>>>> Any ideas?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>>>>>
>>>>>> edu.mit.bsense.AnalyticsEngine
>>>>>>
>>>>>> Look at the exception. Basically, you'll need to register every class
>>>>>> type that is recursively used by BSONObject.
>>>>>>
>>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Patrick,
>>>>>>>
>>>>>>> I am in fact using Kryo and im registering  BSONObject.class (which
>>>>>>> is
>>>>>>> class
>>>>>>> holding the data) in my KryoRegistrator.
>>>>>>> Im not sure what other classes I should be registering.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Yadid
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>>>>>
>>>>>>>> The problem is you are referencing a class that does not "extend
>>>>>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>>>>>> shuffle data over the network, so it needs to know how to serialize
>>>>>>>> them.
>>>>>>>>
>>>>>>>> One option is to use Kryo for network serialization as described
>>>>>>>> here
>>>>>>>> - you'll have to register all the class that get serialized though.
>>>>>>>>
>>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>>>>>
>>>>>>>> Another option is to write a wrapper class that "extends
>>>>>>>> externalizable" and write the serialization yourself.
>>>>>>>>
>>>>>>>> - Patrick
>>>>>>>>
>>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg
>>>>>>>> <ya...@media.mit.edu>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>>>>>> operator
>>>>>>>>> to the original RDD I get the result as expected.
>>>>>>>>> However when I run a map on the original RDD in order to generate a
>>>>>>>>> new
>>>>>>>>> RDD
>>>>>>>>> with only the first element of each array, and try to apply count()
>>>>>>>>> to
>>>>>>>>> the
>>>>>>>>> new generated RDD I get the following exception:
>>>>>>>>>
>>>>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>>>>>> Failed
>>>>>>>>> to
>>>>>>>>> run count at AnalyticsEngine.java:133
>>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>>>> org.apache.spark.SparkException: Job failed:
>>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>>>>>         at
>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>>>>>         at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> If a run a take() operation on the new RDD I receive the results as
>>>>>>>>> expected. here is my code:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>>>>>>>>> FlatMapFunction<Tuple2<Object,
>>>>>>>>> BSONObject>, Double>() {
>>>>>>>>>             @Override
>>>>>>>>>             public Iterable<Double> call(Tuple2<Object, BSONObject>
>>>>>>>>> e)
>>>>>>>>> {
>>>>>>>>>               BSONObject doc = e._2();
>>>>>>>>>               List<List<Double>> vals =
>>>>>>>>> (List<List<Double>>)doc.get("data");
>>>>>>>>>               List<Double> results = new ArrayList<Double>();
>>>>>>>>>               for (int i=0; i< vals.size();i++ )
>>>>>>>>>                   results.add((Double)vals.get(i).get(0));
>>>>>>>>>               return results;
>>>>>>>>>
>>>>>>>>>             }
>>>>>>>>>             });
>>>>>>>>>
>>>>>>>>>             logger.info("Take: {}", rdd2.take(100));
>>>>>>>>>             logger.info("Count: {}", rdd2.count());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any ideas on what I am doing wrong ?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Yadid
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Yadid Ayzenberg
>>>>>>>>> Graduate Student and Research Assistant
>>>>>>>>> Affective Computing
>>>>>>>>> Phone: 617-866-7226
>>>>>>>>> Room: E14-274G
>>>>>>>>> MIT Media Lab
>>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>> --
>>>>>>> Yadid Ayzenberg
>>>>>>> Graduate Student and Research Assistant
>>>>>>> Affective Computing
>>>>>>> Phone: 617-866-7226
>>>>>>> Room: E14-274G
>>>>>>> MIT Media Lab
>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>>
>>>>>>>
>>>>>>>
>>>>> --
>>>>> Yadid Ayzenberg
>>>>> Graduate Student and Research Assistant
>>>>> Affective Computing
>>>>> Phone: 617-866-7226
>>>>> Room: E14-274G
>>>>> MIT Media Lab
>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
>>>
>>>
>>>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Re: java.io.NotSerializableException on RDD count() in Java

Posted by Yadid Ayzenberg <ya...@media.mit.edu>.
code is below. in the code rdd.count() works, but rdd2.count() fails.

public class AnalyticsEngine  implements Serializable {

     private static AnalyticsEngine engine=null;
     private JavaSparkContext sc;

     final Logger logger = LoggerFactory.getLogger(AnalyticsEngine.class);
     private Properties prop;

     String db_host;

     private AnalyticsEngine()
     {
         System.setProperty("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
         System.setProperty("spark.kryo.registrator", 
"edu.mit.bsense.MyRegistrator");
         sc = new JavaSparkContext("local[4]","TestSpark");
         Properties prop = new Properties();
         try {
             prop.load(new FileInputStream("config.properties"));


             db_host = prop.getProperty("database_host1");
             logger.info("Database host: {}", db_host);
         }  catch (FileNotFoundException ex)
                 {
                     logger.info("Could not read config.properties: " + 
ex.toString());

                 } catch (IOException ex)
                 {
                     logger.info("Could not read config.properties: " + 
ex.toString());

                 }



         public void getData(void)
         {
         Configuration conf = new Configuration();

         String conf_url = "mongodb://" + db_host + "/test.data1"; 
//this is the data partition
         conf.set("mongo.input.uri", conf_url);


         conf.set("mongo.input.query", 
"{\"streamId\":\""+"13"+"\"},{\"data\":1}");
         conf.set("mongo.input.split_size","64");

         JavaPairRDD<Object,BSONObject> rdd = sc.newAPIHadoopRDD(conf, 
MongoInputFormat.class, Object.class, BSONObject.class);

         rdd.cache();

         logger.info("Count of rdd: {}", rdd.count());

logger.info("==========================================================================");



         JavaDoubleRDD rdd2 =  rdd.flatMap( new 
DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
         @Override
         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
           BSONObject doc = e._2();
           BasicDBList vals = (BasicDBList)doc.get("data");
           List<Double> results = new ArrayList<Double>();
           for (int i=0; i< vals.size();i++ )
results.add((Double)((BasicDBList)vals.get(i)).get(0));
           return results;

         }
         });

         logger.info("Take: {}", rdd2.take(100));
         logger.info("Count: {}", rdd2.count());


     }

     }


On 11/3/13 8:19 PM, Patrick Wendell wrote:
> Thanks that would help. This would be consistent with there being a
> reference to the SparkContext itself inside of the closure. Just want
> to make sure that's not the case.
>
> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
>> Im running in local[4] mode - so there are no slave machines. Full stack
>> trace:
>>
>>
>> (run-main) org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>      at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> [debug]     Thread run-main exited.
>> [debug] Interrupting remaining threads (should be all daemons).
>> [debug] Sandboxed run complete..
>> java.lang.RuntimeException: Nonzero exit code: 1
>>      at scala.sys.package$.error(package.scala:27)
>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>      at scala.Option.foreach(Option.scala:236)
>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>>      at sbt.Defaults$.toError(Defaults.scala:34)
>>      at
>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>>      at
>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>>      at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>>      at
>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>      at
>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>>      at sbt.Execute.work(Execute.scala:244)
>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>      at
>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>>      at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>      at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>>      at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>      at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>      at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>      at java.lang.Thread.run(Thread.java:695)
>>
>> when I add implements Serializable to my class, I get the following stack
>> trace:
>>
>> error] (run-main) org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
>>
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>      at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>> [debug]     Thread run-main exited.
>> [debug] Interrupting remaining threads (should be all daemons).
>> [debug] Sandboxed run complete..
>> java.lang.RuntimeException: Nonzero exit code: 1
>>      at scala.sys.package$.error(package.scala:27)
>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>      at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>>      at scala.Option.foreach(Option.scala:236)
>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>>      at sbt.Defaults$.toError(Defaults.scala:34)
>>      at
>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>>      at
>> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>>      at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>>      at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>>      at
>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>      at
>> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>>      at sbt.Execute.work(Execute.scala:244)
>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>>      at
>> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>>      at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>>      at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>      at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>>      at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>      at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>>      at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>>      at java.lang.Thread.run(Thread.java:695)
>>
>> I can post my code if that helps
>>
>>
>>
>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>>> If you look in the UI, are there failures on any of the slaves that
>>> you can give a  stack trace for? That would narrow down where the
>>> serialization error is happening.
>>>
>>> Unfortunately this code path doesn't print a full stack trace which
>>> makes it harder to debug where the serialization error comes from.
>>>
>>> Could you post all of your code?
>>>
>>> Also, just wondering, what happens if you just go ahead and add
>>> "extends Serializable" to AnalyticsEngine class? It's possible this is
>>> happening during closure serialization, which will use the closure
>>> serializer (which is by default Java).
>>>
>>> - Patrick
>>>
>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>>> wrote:
>>>> yes, I tried that as well (it is currently registered with Kryo)-
>>>> although
>>>> it doesnt make sense to me (and doesnt solve the problem). I also made
>>>> sure
>>>> my registration was running:
>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>>>> registrator: edu.mit.bsense.MyRegistrator
>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>>>> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
>>>> edu.mit.bsense.MyRegistrator
>>>>
>>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>>>> instantiates the RDDs and runs the map() and count().
>>>> Can you explain why it needs to be serialized?
>>>>
>>>> Also, when running count() on my original RDD (pre map) I get the right
>>>> answer - this means the classes of data in the RDD are serializable.
>>>> It's only when I run map, and then count() on a new RDD do I get this
>>>> exception. My map does not introduce any new classes it - just iterates
>>>> over
>>>> the existing data.
>>>>
>>>> Any ideas?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>>>> edu.mit.bsense.AnalyticsEngine
>>>>>
>>>>> Look at the exception. Basically, you'll need to register every class
>>>>> type that is recursively used by BSONObject.
>>>>>
>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>>> wrote:
>>>>>> Hi Patrick,
>>>>>>
>>>>>> I am in fact using Kryo and im registering  BSONObject.class (which is
>>>>>> class
>>>>>> holding the data) in my KryoRegistrator.
>>>>>> Im not sure what other classes I should be registering.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Yadid
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>>>> The problem is you are referencing a class that does not "extend
>>>>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>>>>> shuffle data over the network, so it needs to know how to serialize
>>>>>>> them.
>>>>>>>
>>>>>>> One option is to use Kryo for network serialization as described here
>>>>>>> - you'll have to register all the class that get serialized though.
>>>>>>>
>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>>>>
>>>>>>> Another option is to write a wrapper class that "extends
>>>>>>> externalizable" and write the serialization yourself.
>>>>>>>
>>>>>>> - Patrick
>>>>>>>
>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>>>>> wrote:
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>>>>> operator
>>>>>>>> to the original RDD I get the result as expected.
>>>>>>>> However when I run a map on the original RDD in order to generate a
>>>>>>>> new
>>>>>>>> RDD
>>>>>>>> with only the first element of each array, and try to apply count()
>>>>>>>> to
>>>>>>>> the
>>>>>>>> new generated RDD I get the following exception:
>>>>>>>>
>>>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>>>>> Failed
>>>>>>>> to
>>>>>>>> run count at AnalyticsEngine.java:133
>>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>>> org.apache.spark.SparkException: Job failed:
>>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>>>>         at
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>>>>         at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>>>>
>>>>>>>>
>>>>>>>> If a run a take() operation on the new RDD I receive the results as
>>>>>>>> expected. here is my code:
>>>>>>>>
>>>>>>>>
>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>>>>>>>> FlatMapFunction<Tuple2<Object,
>>>>>>>> BSONObject>, Double>() {
>>>>>>>>             @Override
>>>>>>>>             public Iterable<Double> call(Tuple2<Object, BSONObject> e)
>>>>>>>> {
>>>>>>>>               BSONObject doc = e._2();
>>>>>>>>               List<List<Double>> vals =
>>>>>>>> (List<List<Double>>)doc.get("data");
>>>>>>>>               List<Double> results = new ArrayList<Double>();
>>>>>>>>               for (int i=0; i< vals.size();i++ )
>>>>>>>>                   results.add((Double)vals.get(i).get(0));
>>>>>>>>               return results;
>>>>>>>>
>>>>>>>>             }
>>>>>>>>             });
>>>>>>>>
>>>>>>>>             logger.info("Take: {}", rdd2.take(100));
>>>>>>>>             logger.info("Count: {}", rdd2.count());
>>>>>>>>
>>>>>>>>
>>>>>>>> Any ideas on what I am doing wrong ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Yadid
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Yadid Ayzenberg
>>>>>>>> Graduate Student and Research Assistant
>>>>>>>> Affective Computing
>>>>>>>> Phone: 617-866-7226
>>>>>>>> Room: E14-274G
>>>>>>>> MIT Media Lab
>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>> --
>>>>>> Yadid Ayzenberg
>>>>>> Graduate Student and Research Assistant
>>>>>> Affective Computing
>>>>>> Phone: 617-866-7226
>>>>>> Room: E14-274G
>>>>>> MIT Media Lab
>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>
>>>>>>
>>>>>>
>>>> --
>>>> Yadid Ayzenberg
>>>> Graduate Student and Research Assistant
>>>> Affective Computing
>>>> Phone: 617-866-7226
>>>> Room: E14-274G
>>>> MIT Media Lab
>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>
>>>>
>>>>
>>
>> --
>> Yadid Ayzenberg
>> Graduate Student and Research Assistant
>> Affective Computing
>> Phone: 617-866-7226
>> Room: E14-274G
>> MIT Media Lab
>> 75 Amherst st, Cambridge, MA, 02139
>>
>>
>>


-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Re: java.io.NotSerializableException on RDD count() in Java

Posted by Patrick Wendell <pw...@gmail.com>.
Thanks that would help. This would be consistent with there being a
reference to the SparkContext itself inside of the closure. Just want
to make sure that's not the case.

On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> Im running in local[4] mode - so there are no slave machines. Full stack
> trace:
>
>
> (run-main) org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
> org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> [debug]     Thread run-main exited.
> [debug] Interrupting remaining threads (should be all daemons).
> [debug] Sandboxed run complete..
> java.lang.RuntimeException: Nonzero exit code: 1
>     at scala.sys.package$.error(package.scala:27)
>     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     at scala.Option.foreach(Option.scala:236)
>     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>     at sbt.Defaults$.toError(Defaults.scala:34)
>     at
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>     at
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>     at sbt.std.Transform$$anon$4.work(System.scala:64)
>     at
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     at
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>     at sbt.Execute.work(Execute.scala:244)
>     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     at
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     at java.lang.Thread.run(Thread.java:695)
>
> when I add implements Serializable to my class, I get the following stack
> trace:
>
> error] (run-main) org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
> org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
>
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
> [debug]     Thread run-main exited.
> [debug] Interrupting remaining threads (should be all daemons).
> [debug] Sandboxed run complete..
> java.lang.RuntimeException: Nonzero exit code: 1
>     at scala.sys.package$.error(package.scala:27)
>     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     at scala.Option.foreach(Option.scala:236)
>     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>     at sbt.Defaults$.toError(Defaults.scala:34)
>     at
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>     at
> sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>     at sbt.std.Transform$$anon$4.work(System.scala:64)
>     at
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     at
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>     at sbt.Execute.work(Execute.scala:244)
>     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     at
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     at java.lang.Thread.run(Thread.java:695)
>
> I can post my code if that helps
>
>
>
> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>>
>> If you look in the UI, are there failures on any of the slaves that
>> you can give a  stack trace for? That would narrow down where the
>> serialization error is happening.
>>
>> Unfortunately this code path doesn't print a full stack trace which
>> makes it harder to debug where the serialization error comes from.
>>
>> Could you post all of your code?
>>
>> Also, just wondering, what happens if you just go ahead and add
>> "extends Serializable" to AnalyticsEngine class? It's possible this is
>> happening during closure serialization, which will use the closure
>> serializer (which is by default Java).
>>
>> - Patrick
>>
>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> wrote:
>>>
>>> yes, I tried that as well (it is currently registered with Kryo)-
>>> although
>>> it doesnt make sense to me (and doesnt solve the problem). I also made
>>> sure
>>> my registration was running:
>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>>> registrator: edu.mit.bsense.MyRegistrator
>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>>> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
>>> edu.mit.bsense.MyRegistrator
>>>
>>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>>> instantiates the RDDs and runs the map() and count().
>>> Can you explain why it needs to be serialized?
>>>
>>> Also, when running count() on my original RDD (pre map) I get the right
>>> answer - this means the classes of data in the RDD are serializable.
>>> It's only when I run map, and then count() on a new RDD do I get this
>>> exception. My map does not introduce any new classes it - just iterates
>>> over
>>> the existing data.
>>>
>>> Any ideas?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>>>
>>>> edu.mit.bsense.AnalyticsEngine
>>>>
>>>> Look at the exception. Basically, you'll need to register every class
>>>> type that is recursively used by BSONObject.
>>>>
>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>> wrote:
>>>>>
>>>>> Hi Patrick,
>>>>>
>>>>> I am in fact using Kryo and im registering  BSONObject.class (which is
>>>>> class
>>>>> holding the data) in my KryoRegistrator.
>>>>> Im not sure what other classes I should be registering.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Yadid
>>>>>
>>>>>
>>>>>
>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>>>
>>>>>> The problem is you are referencing a class that does not "extend
>>>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>>>> shuffle data over the network, so it needs to know how to serialize
>>>>>> them.
>>>>>>
>>>>>> One option is to use Kryo for network serialization as described here
>>>>>> - you'll have to register all the class that get serialized though.
>>>>>>
>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>>>
>>>>>> Another option is to write a wrapper class that "extends
>>>>>> externalizable" and write the serialization yourself.
>>>>>>
>>>>>> - Patrick
>>>>>>
>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>>>> operator
>>>>>>> to the original RDD I get the result as expected.
>>>>>>> However when I run a map on the original RDD in order to generate a
>>>>>>> new
>>>>>>> RDD
>>>>>>> with only the first element of each array, and try to apply count()
>>>>>>> to
>>>>>>> the
>>>>>>> new generated RDD I get the following exception:
>>>>>>>
>>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>>>> Failed
>>>>>>> to
>>>>>>> run count at AnalyticsEngine.java:133
>>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>> org.apache.spark.SparkException: Job failed:
>>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>>>        at
>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>>>        at
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>>>
>>>>>>>
>>>>>>> If a run a take() operation on the new RDD I receive the results as
>>>>>>> expected. here is my code:
>>>>>>>
>>>>>>>
>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>>>>>>> FlatMapFunction<Tuple2<Object,
>>>>>>> BSONObject>, Double>() {
>>>>>>>            @Override
>>>>>>>            public Iterable<Double> call(Tuple2<Object, BSONObject> e)
>>>>>>> {
>>>>>>>              BSONObject doc = e._2();
>>>>>>>              List<List<Double>> vals =
>>>>>>> (List<List<Double>>)doc.get("data");
>>>>>>>              List<Double> results = new ArrayList<Double>();
>>>>>>>              for (int i=0; i< vals.size();i++ )
>>>>>>>                  results.add((Double)vals.get(i).get(0));
>>>>>>>              return results;
>>>>>>>
>>>>>>>            }
>>>>>>>            });
>>>>>>>
>>>>>>>            logger.info("Take: {}", rdd2.take(100));
>>>>>>>            logger.info("Count: {}", rdd2.count());
>>>>>>>
>>>>>>>
>>>>>>> Any ideas on what I am doing wrong ?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Yadid
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Yadid Ayzenberg
>>>>>>> Graduate Student and Research Assistant
>>>>>>> Affective Computing
>>>>>>> Phone: 617-866-7226
>>>>>>> Room: E14-274G
>>>>>>> MIT Media Lab
>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>>
>>>>>>>
>>>>>>>
>>>>> --
>>>>> Yadid Ayzenberg
>>>>> Graduate Student and Research Assistant
>>>>> Affective Computing
>>>>> Phone: 617-866-7226
>>>>> Room: E14-274G
>>>>> MIT Media Lab
>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
>>>
>>>
>>>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Re: java.io.NotSerializableException on RDD count() in Java

Posted by Yadid Ayzenberg <ya...@media.mit.edu>.
Im running in local[4] mode - so there are no slave machines. Full stack 
trace:

(run-main) org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
[debug]     Thread run-main exited.
[debug] Interrupting remaining threads (should be all daemons).
[debug] Sandboxed run complete..
java.lang.RuntimeException: Nonzero exit code: 1
     at scala.sys.package$.error(package.scala:27)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at scala.Option.foreach(Option.scala:236)
     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
     at sbt.Defaults$.toError(Defaults.scala:34)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
     at sbt.std.Transform$$anon$4.work(System.scala:64)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
     at sbt.Execute.work(Execute.scala:244)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at 
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
     at java.lang.Thread.run(Thread.java:695)

when I add implements Serializable to my class, I get the following 
stack trace:

error] (run-main) org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
[debug]     Thread run-main exited.
[debug] Interrupting remaining threads (should be all daemons).
[debug] Sandboxed run complete..
java.lang.RuntimeException: Nonzero exit code: 1
     at scala.sys.package$.error(package.scala:27)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at scala.Option.foreach(Option.scala:236)
     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
     at sbt.Defaults$.toError(Defaults.scala:34)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
     at sbt.std.Transform$$anon$4.work(System.scala:64)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
     at sbt.Execute.work(Execute.scala:244)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at 
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
     at java.lang.Thread.run(Thread.java:695)

I can post my code if that helps


On 11/3/13 8:05 PM, Patrick Wendell wrote:
> If you look in the UI, are there failures on any of the slaves that
> you can give a  stack trace for? That would narrow down where the
> serialization error is happening.
>
> Unfortunately this code path doesn't print a full stack trace which
> makes it harder to debug where the serialization error comes from.
>
> Could you post all of your code?
>
> Also, just wondering, what happens if you just go ahead and add
> "extends Serializable" to AnalyticsEngine class? It's possible this is
> happening during closure serialization, which will use the closure
> serializer (which is by default Java).
>
> - Patrick
>
> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
>> yes, I tried that as well (it is currently registered with Kryo)- although
>> it doesnt make sense to me (and doesnt solve the problem). I also made sure
>> my registration was running:
>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>> registrator: edu.mit.bsense.MyRegistrator
>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
>> edu.mit.bsense.MyRegistrator
>>
>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>> instantiates the RDDs and runs the map() and count().
>> Can you explain why it needs to be serialized?
>>
>> Also, when running count() on my original RDD (pre map) I get the right
>> answer - this means the classes of data in the RDD are serializable.
>> It's only when I run map, and then count() on a new RDD do I get this
>> exception. My map does not introduce any new classes it - just iterates over
>> the existing data.
>>
>> Any ideas?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>> edu.mit.bsense.AnalyticsEngine
>>>
>>> Look at the exception. Basically, you'll need to register every class
>>> type that is recursively used by BSONObject.
>>>
>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>>> wrote:
>>>> Hi Patrick,
>>>>
>>>> I am in fact using Kryo and im registering  BSONObject.class (which is
>>>> class
>>>> holding the data) in my KryoRegistrator.
>>>> Im not sure what other classes I should be registering.
>>>>
>>>> Thanks,
>>>>
>>>> Yadid
>>>>
>>>>
>>>>
>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>> The problem is you are referencing a class that does not "extend
>>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>>> shuffle data over the network, so it needs to know how to serialize
>>>>> them.
>>>>>
>>>>> One option is to use Kryo for network serialization as described here
>>>>> - you'll have to register all the class that get serialized though.
>>>>>
>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>>
>>>>> Another option is to write a wrapper class that "extends
>>>>> externalizable" and write the serialization yourself.
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>>> wrote:
>>>>>> Hi All,
>>>>>>
>>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>>> operator
>>>>>> to the original RDD I get the result as expected.
>>>>>> However when I run a map on the original RDD in order to generate a new
>>>>>> RDD
>>>>>> with only the first element of each array, and try to apply count() to
>>>>>> the
>>>>>> new generated RDD I get the following exception:
>>>>>>
>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>>> Failed
>>>>>> to
>>>>>> run count at AnalyticsEngine.java:133
>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>> org.apache.spark.SparkException: Job failed:
>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>>        at
>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>>
>>>>>>
>>>>>> If a run a take() operation on the new RDD I receive the results as
>>>>>> expected. here is my code:
>>>>>>
>>>>>>
>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>>>>> BSONObject>, Double>() {
>>>>>>            @Override
>>>>>>            public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>>>>>              BSONObject doc = e._2();
>>>>>>              List<List<Double>> vals =
>>>>>> (List<List<Double>>)doc.get("data");
>>>>>>              List<Double> results = new ArrayList<Double>();
>>>>>>              for (int i=0; i< vals.size();i++ )
>>>>>>                  results.add((Double)vals.get(i).get(0));
>>>>>>              return results;
>>>>>>
>>>>>>            }
>>>>>>            });
>>>>>>
>>>>>>            logger.info("Take: {}", rdd2.take(100));
>>>>>>            logger.info("Count: {}", rdd2.count());
>>>>>>
>>>>>>
>>>>>> Any ideas on what I am doing wrong ?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Yadid
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Yadid Ayzenberg
>>>>>> Graduate Student and Research Assistant
>>>>>> Affective Computing
>>>>>> Phone: 617-866-7226
>>>>>> Room: E14-274G
>>>>>> MIT Media Lab
>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>
>>>>>>
>>>>>>
>>>> --
>>>> Yadid Ayzenberg
>>>> Graduate Student and Research Assistant
>>>> Affective Computing
>>>> Phone: 617-866-7226
>>>> Room: E14-274G
>>>> MIT Media Lab
>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>
>>>>
>>>>
>>
>> --
>> Yadid Ayzenberg
>> Graduate Student and Research Assistant
>> Affective Computing
>> Phone: 617-866-7226
>> Room: E14-274G
>> MIT Media Lab
>> 75 Amherst st, Cambridge, MA, 02139
>>
>>
>>


-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Re: java.io.NotSerializableException on RDD count() in Java

Posted by Patrick Wendell <pw...@gmail.com>.
If you look in the UI, are there failures on any of the slaves that
you can give a  stack trace for? That would narrow down where the
serialization error is happening.

Unfortunately this code path doesn't print a full stack trace which
makes it harder to debug where the serialization error comes from.

Could you post all of your code?

Also, just wondering, what happens if you just go ahead and add
"extends Serializable" to AnalyticsEngine class? It's possible this is
happening during closure serialization, which will use the closure
serializer (which is by default Java).

- Patrick

On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> yes, I tried that as well (it is currently registered with Kryo)- although
> it doesnt make sense to me (and doesnt solve the problem). I also made sure
> my registration was running:
> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
> registrator: edu.mit.bsense.MyRegistrator
> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
> edu.mit.bsense.MyRegistrator
>
> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
> instantiates the RDDs and runs the map() and count().
> Can you explain why it needs to be serialized?
>
> Also, when running count() on my original RDD (pre map) I get the right
> answer - this means the classes of data in the RDD are serializable.
> It's only when I run map, and then count() on a new RDD do I get this
> exception. My map does not introduce any new classes it - just iterates over
> the existing data.
>
> Any ideas?
>
>
>
>
>
>
>
>
>
> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>
>> edu.mit.bsense.AnalyticsEngine
>>
>> Look at the exception. Basically, you'll need to register every class
>> type that is recursively used by BSONObject.
>>
>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu>
>> wrote:
>>>
>>> Hi Patrick,
>>>
>>> I am in fact using Kryo and im registering  BSONObject.class (which is
>>> class
>>> holding the data) in my KryoRegistrator.
>>> Im not sure what other classes I should be registering.
>>>
>>> Thanks,
>>>
>>> Yadid
>>>
>>>
>>>
>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>
>>>> The problem is you are referencing a class that does not "extend
>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>> shuffle data over the network, so it needs to know how to serialize
>>>> them.
>>>>
>>>> One option is to use Kryo for network serialization as described here
>>>> - you'll have to register all the class that get serialized though.
>>>>
>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>
>>>> Another option is to write a wrapper class that "extends
>>>> externalizable" and write the serialization yourself.
>>>>
>>>> - Patrick
>>>>
>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
>>>> wrote:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>> operator
>>>>> to the original RDD I get the result as expected.
>>>>> However when I run a map on the original RDD in order to generate a new
>>>>> RDD
>>>>> with only the first element of each array, and try to apply count() to
>>>>> the
>>>>> new generated RDD I get the following exception:
>>>>>
>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>> Failed
>>>>> to
>>>>> run count at AnalyticsEngine.java:133
>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>> org.apache.spark.SparkException: Job failed:
>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>       at
>>>>>
>>>>>
>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>       at
>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>       at
>>>>>
>>>>>
>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>
>>>>>
>>>>> If a run a take() operation on the new RDD I receive the results as
>>>>> expected. here is my code:
>>>>>
>>>>>
>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>>>> BSONObject>, Double>() {
>>>>>           @Override
>>>>>           public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>>>>             BSONObject doc = e._2();
>>>>>             List<List<Double>> vals =
>>>>> (List<List<Double>>)doc.get("data");
>>>>>             List<Double> results = new ArrayList<Double>();
>>>>>             for (int i=0; i< vals.size();i++ )
>>>>>                 results.add((Double)vals.get(i).get(0));
>>>>>             return results;
>>>>>
>>>>>           }
>>>>>           });
>>>>>
>>>>>           logger.info("Take: {}", rdd2.take(100));
>>>>>           logger.info("Count: {}", rdd2.count());
>>>>>
>>>>>
>>>>> Any ideas on what I am doing wrong ?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Yadid
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Yadid Ayzenberg
>>>>> Graduate Student and Research Assistant
>>>>> Affective Computing
>>>>> Phone: 617-866-7226
>>>>> Room: E14-274G
>>>>> MIT Media Lab
>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
>>>
>>>
>>>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Re: java.io.NotSerializableException on RDD count() in Java

Posted by Yadid Ayzenberg <ya...@media.mit.edu>.
yes, I tried that as well (it is currently registered with Kryo)- 
although it doesnt make sense to me (and doesnt solve the problem). I 
also made sure my registration was running:
DEBUG org.apache.spark.serializer.KryoSerializer  - Running user 
registrator: edu.mit.bsense.MyRegistrator
7841 [spark-akka.actor.default-dispatcher-3] DEBUG 
org.apache.spark.serializer.KryoSerializer  - Running user registrator: 
edu.mit.bsense.MyRegistrator

edu.mit.bsense.AnalyticsEngine is the class containing the SC which instantiates the RDDs and runs the map() and count().
Can you explain why it needs to be serialized?

Also, when running count() on my original RDD (pre map) I get the right answer - this means the classes of data in the RDD are serializable.
It's only when I run map, and then count() on a new RDD do I get this exception. My map does not introduce any new classes it - just iterates over the existing data.

Any ideas?








On 11/3/13 7:43 PM, Patrick Wendell wrote:
> edu.mit.bsense.AnalyticsEngine
>
> Look at the exception. Basically, you'll need to register every class
> type that is recursively used by BSONObject.
>
> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
>> Hi Patrick,
>>
>> I am in fact using Kryo and im registering  BSONObject.class (which is class
>> holding the data) in my KryoRegistrator.
>> Im not sure what other classes I should be registering.
>>
>> Thanks,
>>
>> Yadid
>>
>>
>>
>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>> The problem is you are referencing a class that does not "extend
>>> serializable" in the data that you shuffle. Spark needs to send all
>>> shuffle data over the network, so it needs to know how to serialize
>>> them.
>>>
>>> One option is to use Kryo for network serialization as described here
>>> - you'll have to register all the class that get serialized though.
>>>
>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>
>>> Another option is to write a wrapper class that "extends
>>> externalizable" and write the serialization yourself.
>>>
>>> - Patrick
>>>
>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
>>> wrote:
>>>> Hi All,
>>>>
>>>> My original RDD contains arrays of doubles. when appying a count()
>>>> operator
>>>> to the original RDD I get the result as expected.
>>>> However when I run a map on the original RDD in order to generate a new
>>>> RDD
>>>> with only the first element of each array, and try to apply count() to
>>>> the
>>>> new generated RDD I get the following exception:
>>>>
>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed
>>>> to
>>>> run count at AnalyticsEngine.java:133
>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>> org.apache.spark.SparkException: Job failed:
>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>       at
>>>>
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>       at
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>       at
>>>>
>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>
>>>>
>>>> If a run a take() operation on the new RDD I receive the results as
>>>> expected. here is my code:
>>>>
>>>>
>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>>> BSONObject>, Double>() {
>>>>           @Override
>>>>           public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>>>             BSONObject doc = e._2();
>>>>             List<List<Double>> vals = (List<List<Double>>)doc.get("data");
>>>>             List<Double> results = new ArrayList<Double>();
>>>>             for (int i=0; i< vals.size();i++ )
>>>>                 results.add((Double)vals.get(i).get(0));
>>>>             return results;
>>>>
>>>>           }
>>>>           });
>>>>
>>>>           logger.info("Take: {}", rdd2.take(100));
>>>>           logger.info("Count: {}", rdd2.count());
>>>>
>>>>
>>>> Any ideas on what I am doing wrong ?
>>>>
>>>> Thanks,
>>>>
>>>> Yadid
>>>>
>>>>
>>>>
>>>> --
>>>> Yadid Ayzenberg
>>>> Graduate Student and Research Assistant
>>>> Affective Computing
>>>> Phone: 617-866-7226
>>>> Room: E14-274G
>>>> MIT Media Lab
>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>
>>>>
>>>>
>>
>> --
>> Yadid Ayzenberg
>> Graduate Student and Research Assistant
>> Affective Computing
>> Phone: 617-866-7226
>> Room: E14-274G
>> MIT Media Lab
>> 75 Amherst st, Cambridge, MA, 02139
>>
>>
>>


-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Re: java.io.NotSerializableException on RDD count() in Java

Posted by Patrick Wendell <pw...@gmail.com>.
edu.mit.bsense.AnalyticsEngine

Look at the exception. Basically, you'll need to register every class
type that is recursively used by BSONObject.

On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> Hi Patrick,
>
> I am in fact using Kryo and im registering  BSONObject.class (which is class
> holding the data) in my KryoRegistrator.
> Im not sure what other classes I should be registering.
>
> Thanks,
>
> Yadid
>
>
>
> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>
>> The problem is you are referencing a class that does not "extend
>> serializable" in the data that you shuffle. Spark needs to send all
>> shuffle data over the network, so it needs to know how to serialize
>> them.
>>
>> One option is to use Kryo for network serialization as described here
>> - you'll have to register all the class that get serialized though.
>>
>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>
>> Another option is to write a wrapper class that "extends
>> externalizable" and write the serialization yourself.
>>
>> - Patrick
>>
>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu>
>> wrote:
>>>
>>> Hi All,
>>>
>>> My original RDD contains arrays of doubles. when appying a count()
>>> operator
>>> to the original RDD I get the result as expected.
>>> However when I run a map on the original RDD in order to generate a new
>>> RDD
>>> with only the first element of each array, and try to apply count() to
>>> the
>>> new generated RDD I get the following exception:
>>>
>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed
>>> to
>>> run count at AnalyticsEngine.java:133
>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>> org.apache.spark.SparkException: Job failed:
>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>      at
>>>
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>      at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>      at
>>>
>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>
>>>
>>> If a run a take() operation on the new RDD I receive the results as
>>> expected. here is my code:
>>>
>>>
>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>> BSONObject>, Double>() {
>>>          @Override
>>>          public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>>            BSONObject doc = e._2();
>>>            List<List<Double>> vals = (List<List<Double>>)doc.get("data");
>>>            List<Double> results = new ArrayList<Double>();
>>>            for (int i=0; i< vals.size();i++ )
>>>                results.add((Double)vals.get(i).get(0));
>>>            return results;
>>>
>>>          }
>>>          });
>>>
>>>          logger.info("Take: {}", rdd2.take(100));
>>>          logger.info("Count: {}", rdd2.count());
>>>
>>>
>>> Any ideas on what I am doing wrong ?
>>>
>>> Thanks,
>>>
>>> Yadid
>>>
>>>
>>>
>>> --
>>> Yadid Ayzenberg
>>> Graduate Student and Research Assistant
>>> Affective Computing
>>> Phone: 617-866-7226
>>> Room: E14-274G
>>> MIT Media Lab
>>> 75 Amherst st, Cambridge, MA, 02139
>>>
>>>
>>>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>

Re: java.io.NotSerializableException on RDD count() in Java

Posted by Yadid Ayzenberg <ya...@media.mit.edu>.
Hi Patrick,

I am in fact using Kryo and im registering  BSONObject.class (which is 
class holding the data) in my KryoRegistrator.
Im not sure what other classes I should be registering.

Thanks,

Yadid


On 11/3/13 7:23 PM, Patrick Wendell wrote:
> The problem is you are referencing a class that does not "extend
> serializable" in the data that you shuffle. Spark needs to send all
> shuffle data over the network, so it needs to know how to serialize
> them.
>
> One option is to use Kryo for network serialization as described here
> - you'll have to register all the class that get serialized though.
>
> http://spark.incubator.apache.org/docs/latest/tuning.html
>
> Another option is to write a wrapper class that "extends
> externalizable" and write the serialization yourself.
>
> - Patrick
>
> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
>> Hi All,
>>
>> My original RDD contains arrays of doubles. when appying a count() operator
>> to the original RDD I get the result as expected.
>> However when I run a map on the original RDD in order to generate a new RDD
>> with only the first element of each array, and try to apply count() to the
>> new generated RDD I get the following exception:
>>
>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed to
>> run count at AnalyticsEngine.java:133
>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>> org.apache.spark.SparkException: Job failed:
>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>      at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>      at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>      at
>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>
>>
>> If a run a take() operation on the new RDD I receive the results as
>> expected. here is my code:
>>
>>
>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>> BSONObject>, Double>() {
>>          @Override
>>          public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>>            BSONObject doc = e._2();
>>            List<List<Double>> vals = (List<List<Double>>)doc.get("data");
>>            List<Double> results = new ArrayList<Double>();
>>            for (int i=0; i< vals.size();i++ )
>>                results.add((Double)vals.get(i).get(0));
>>            return results;
>>
>>          }
>>          });
>>
>>          logger.info("Take: {}", rdd2.take(100));
>>          logger.info("Count: {}", rdd2.count());
>>
>>
>> Any ideas on what I am doing wrong ?
>>
>> Thanks,
>>
>> Yadid
>>
>>
>>
>> --
>> Yadid Ayzenberg
>> Graduate Student and Research Assistant
>> Affective Computing
>> Phone: 617-866-7226
>> Room: E14-274G
>> MIT Media Lab
>> 75 Amherst st, Cambridge, MA, 02139
>>
>>
>>


-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Re: java.io.NotSerializableException on RDD count() in Java

Posted by Patrick Wendell <pw...@gmail.com>.
The problem is you are referencing a class that does not "extend
serializable" in the data that you shuffle. Spark needs to send all
shuffle data over the network, so it needs to know how to serialize
them.

One option is to use Kryo for network serialization as described here
- you'll have to register all the class that get serialized though.

http://spark.incubator.apache.org/docs/latest/tuning.html

Another option is to write a wrapper class that "extends
externalizable" and write the serialization yourself.

- Patrick

On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <ya...@media.mit.edu> wrote:
> Hi All,
>
> My original RDD contains arrays of doubles. when appying a count() operator
> to the original RDD I get the result as expected.
> However when I run a map on the original RDD in order to generate a new RDD
> with only the first element of each array, and try to apply count() to the
> new generated RDD I get the following exception:
>
> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  - Failed to
> run count at AnalyticsEngine.java:133
> [error] (run-main) org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
> org.apache.spark.SparkException: Job failed:
> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>
>
> If a run a take() operation on the new RDD I receive the results as
> expected. here is my code:
>
>
> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
> BSONObject>, Double>() {
>         @Override
>         public Iterable<Double> call(Tuple2<Object, BSONObject> e) {
>           BSONObject doc = e._2();
>           List<List<Double>> vals = (List<List<Double>>)doc.get("data");
>           List<Double> results = new ArrayList<Double>();
>           for (int i=0; i< vals.size();i++ )
>               results.add((Double)vals.get(i).get(0));
>           return results;
>
>         }
>         });
>
>         logger.info("Take: {}", rdd2.take(100));
>         logger.info("Count: {}", rdd2.count());
>
>
> Any ideas on what I am doing wrong ?
>
> Thanks,
>
> Yadid
>
>
>
> --
> Yadid Ayzenberg
> Graduate Student and Research Assistant
> Affective Computing
> Phone: 617-866-7226
> Room: E14-274G
> MIT Media Lab
> 75 Amherst st, Cambridge, MA, 02139
>
>
>