You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Soumya Simanta <so...@gmail.com> on 2014/02/25 03:57:08 UTC

Filter on Date by comparing

I want to filter a RDD by comparing dates.

myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count


I'm using the JodaTime library but I get an exception about a Jodatime
class not serializable.

Is there a way to configure this or an easier alternative for this problem.


org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)

at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: Filter on Date by comparing

Posted by Andrew Ash <an...@andrewash.com>.
It's in the data serialization section of the tuning guide, here:

http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization


On Mon, Feb 24, 2014 at 7:44 PM, Soumya Simanta <so...@gmail.com>wrote:

> Thanks Andrew. I was expecting this to be the issue.
> Are there any pointers about how to change the serialization to Kryo ?
>
>
>
>
> On Mon, Feb 24, 2014 at 10:17 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>> This is because Joda's DateTimeFormatter is not serializable (doesn't
>> implement the empty Serializable interface)
>> http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
>>
>> One ugly thing I've done before is to instantiate a new DateTimeFormatter
>> in every line, so like this:
>>
>> myRDD.filter(x =>
>> DateTimeFormat.forPattern("YYYY-mm-dd").parseString(x.getCreatedAt).isAfter(start)
>> ).count
>>
>> It's very inefficient but it gets things closer to working.
>>
>> Another thing to try is to switch to using Kryo serialization instead of
>> the default Java serialization, which I think did handle DTF formatting
>> correctly.  Back in 0.7.x days though, there was an issue where some of the
>> Joda libraries wouldn't correctly serialize with Kryo, but I think that's
>> since been fixed:
>> https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU
>>
>> HTH,
>> Andrew
>>
>>
>> On Mon, Feb 24, 2014 at 6:57 PM, Soumya Simanta <soumya.simanta@gmail.com
>> > wrote:
>>
>>> I want to filter a RDD by comparing dates.
>>>
>>> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count
>>>
>>>
>>> I'm using the JodaTime library but I get an exception about a Jodatime
>>> class not serializable.
>>>
>>> Is there a way to configure this or an easier alternative for this
>>> problem.
>>>
>>>
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>>
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>>
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>
>>
>

Re: Filter on Date by comparing

Posted by Soumya Simanta <so...@gmail.com>.
Thanks Andrew. I was expecting this to be the issue.
Are there any pointers about how to change the serialization to Kryo ?




On Mon, Feb 24, 2014 at 10:17 PM, Andrew Ash <an...@andrewash.com> wrote:

> This is because Joda's DateTimeFormatter is not serializable (doesn't
> implement the empty Serializable interface)
> http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
>
> One ugly thing I've done before is to instantiate a new DateTimeFormatter
> in every line, so like this:
>
> myRDD.filter(x =>
> DateTimeFormat.forPattern("YYYY-mm-dd").parseString(x.getCreatedAt).isAfter(start)
> ).count
>
> It's very inefficient but it gets things closer to working.
>
> Another thing to try is to switch to using Kryo serialization instead of
> the default Java serialization, which I think did handle DTF formatting
> correctly.  Back in 0.7.x days though, there was an issue where some of the
> Joda libraries wouldn't correctly serialize with Kryo, but I think that's
> since been fixed:
> https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU
>
> HTH,
> Andrew
>
>
> On Mon, Feb 24, 2014 at 6:57 PM, Soumya Simanta <so...@gmail.com>wrote:
>
>> I want to filter a RDD by comparing dates.
>>
>> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count
>>
>>
>> I'm using the JodaTime library but I get an exception about a Jodatime
>> class not serializable.
>>
>> Is there a way to configure this or an easier alternative for this
>> problem.
>>
>>
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>
>

Re: Filter on Date by comparing

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
Or use RDD.filterWith to create whatever you need out of serializable 
parts so you only run it once per partition.
> Andrew Ash <ma...@andrewash.com>
> February 24, 2014 at 7:17 PM
> This is because Joda's DateTimeFormatter is not serializable (doesn't 
> implement the empty Serializable interface) 
> http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html
>
> One ugly thing I've done before is to instantiate a new 
> DateTimeFormatter in every line, so like this:
>
> myRDD.filter(x => 
> DateTimeFormat.forPattern("YYYY-mm-dd").parseString(x.getCreatedAt).isAfter(start) 
> ).count
>
> It's very inefficient but it gets things closer to working.
>
> Another thing to try is to switch to using Kryo serialization instead 
> of the default Java serialization, which I think did handle DTF 
> formatting correctly.  Back in 0.7.x days though, there was an issue 
> where some of the Joda libraries wouldn't correctly serialize with 
> Kryo, but I think that's since been fixed: 
> https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU 
> <https://groups.google.com/forum/#%21topic/cascalog-user/35cdnNIamKU>
>
> HTH,
> Andrew
>
>
>
> Soumya Simanta <ma...@gmail.com>
> February 24, 2014 at 6:57 PM
> I want to filter a RDD by comparing dates.
>
> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count
>
>
>
> I'm using the JodaTime library but I get an exception about a Jodatime 
> class not serializable.
>
> Is there a way to configure this or an easier alternative for this 
> problem.
>
>
> org.apache.spark.SparkException: Job aborted: Task not serializable: 
> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
>
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>
> at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>
> at org.apache.spark.scheduler.DAGScheduler.org 
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>
> at 
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>

Re: Filter on Date by comparing

Posted by Andrew Ash <an...@andrewash.com>.
This is because Joda's DateTimeFormatter is not serializable (doesn't
implement the empty Serializable interface)
http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html

One ugly thing I've done before is to instantiate a new DateTimeFormatter
in every line, so like this:

myRDD.filter(x =>
DateTimeFormat.forPattern("YYYY-mm-dd").parseString(x.getCreatedAt).isAfter(start)
).count

It's very inefficient but it gets things closer to working.

Another thing to try is to switch to using Kryo serialization instead of
the default Java serialization, which I think did handle DTF formatting
correctly.  Back in 0.7.x days though, there was an issue where some of the
Joda libraries wouldn't correctly serialize with Kryo, but I think that's
since been fixed:
https://groups.google.com/forum/#!topic/cascalog-user/35cdnNIamKU

HTH,
Andrew


On Mon, Feb 24, 2014 at 6:57 PM, Soumya Simanta <so...@gmail.com>wrote:

> I want to filter a RDD by comparing dates.
>
> myRDD.filter( x => new DateTime(x.getCreatedAt).isAfter(start) ).count
>
>
> I'm using the JodaTime library but I get an exception about a Jodatime
> class not serializable.
>
> Is there a way to configure this or an easier alternative for this
> problem.
>
>
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: org.joda.time.format.DateTimeFormatter
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>