You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ognen Duzlevski <og...@nengoiksvelzud.com> on 2014/03/07 04:39:28 UTC

Running actions in loops

Hello,

What is the general approach people take when trying to do analysis 
across multiple large files where the data to be extracted from a 
successive file depends on the data extracted from a previous file or 
set of files?

For example:
I have the following: a group of HDFS files each 20+GB in size. I need 
to extract event1 on day 1 from first file and extract event2 from all 
remaining files in a period of successive dates, then do a calculation 
on the two events.
I then need to move on to day2, extract event1 (with certain 
properties), take all following days, extract event2 and run a 
calculation against previous day for all days in period. So on and so on.

I have verified that the following (very naive approach doesn't work):

def 
calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] 
= {
     val epd = new PipelineDate(end)
     val result = for {
       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
       val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","") 
== event1).map(line => 
(line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
       val c = e1.count.toDouble

       val intres = for {
         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
         val e2 = 
f2.filter(_.split(",")(0).split(":")(1).replace("\"","") == 
event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
         val e1e2 = e1.union(e2)
         val r = e1e2.groupByKey().filter(e => e._2.length > 1 && 
e._2.filter(_==0).length>0).count.toDouble
       } yield (c/r) // get the retention rate
     } yield (dt1.toString->intres)
     Map(result:_*)
   }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at 
CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at 
CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at 
CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at 
map at CountActor.scala:32), which has no missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at 
CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not 
serializable: java.io.NotSerializableException: 
com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable: 
java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
     at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
     at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I should mention that this code is fired off from an Akka actor (which 
is controlled by a Scalatra servlet).

Any ideas, recommendations etc.? I am fairly new to Scala and M/R 
principles in general, it is fair to say that at this point I am still 
thinking from a point of view of an imperative programmer trying to fit 
a square peg through a round hole ;)
Ognen

Re: Running actions in loops

Posted by Nick Pentreath <ni...@gmail.com>.
There is #3 which is use mapPartitions and init one jodatime obj per partition, which is less overhead for large objects—
Sent from Mailbox for iPhone

On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi <ma...@gmail.com>
wrote:

> So the whole function closure you want to apply on your RDD needs to be
> serializable so that it can be "serialized" & sent to workers to operate on
> RDD. So objects of jodatime cannot be serialized & sent hence jodatime is
> out of work. 2 bad answers
> 1. initialize jodatime for each row & complete work & destroy them, that
> way they are only intialized when job is running & need not be sent across.
> 2. Write your own parser & hope jodatime guys get their act together.
> Regards
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
> On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
> <og...@nengoiksvelzud.com>wrote:
>>  Mayur, have not thought of that. Yes, I use jodatime. What is the scope
>> that this serialization issue applies to? Only the method making a call
>> into / using such a library? The whole class the method using such a
>> library belongs to? Sorry if it is a dumb question :)
>>
>> Ognen
>>
>>
>> On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
>>
>> Mostly the job you are executing is not serializable, this typically
>> happens when you have a library that is not serializable.. are you using
>> any library like jodatime etc ?
>>
>>  Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>>  @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <
>> ognen@plainvanillagames.com> wrote:
>>
>>> It looks like the problem is in the filter task - is there anything
>>> special about filter()?
>>>
>>> I have removed the filter line from the loops just to see if things will
>>> work and they do.
>>>
>>> Anyone has any ideas?
>>>
>>> Thanks!
>>> Ognen
>>>
>>>
>>> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>>>
>>>> Hello,
>>>>
>>>> What is the general approach people take when trying to do analysis
>>>> across multiple large files where the data to be extracted from a
>>>> successive file depends on the data extracted from a previous file or set
>>>> of files?
>>>>
>>>> For example:
>>>> I have the following: a group of HDFS files each 20+GB in size. I need
>>>> to extract event1 on day 1 from first file and extract event2 from all
>>>> remaining files in a period of successive dates, then do a calculation on
>>>> the two events.
>>>> I then need to move on to day2, extract event1 (with certain
>>>> properties), take all following days, extract event2 and run a calculation
>>>> against previous day for all days in period. So on and so on.
>>>>
>>>> I have verified that the following (very naive approach doesn't work):
>>>>
>>>> def
>>>> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
>>>> = {
>>>>     val epd = new PipelineDate(end)
>>>>     val result = for {
>>>>       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>>>>       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>>>>       val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>>>> == event1).map(line =>
>>>> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>>>>       val c = e1.count.toDouble
>>>>
>>>>       val intres = for {
>>>>         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>>>>         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>>>>         val e2 =
>>>> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>>>> event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
>>>>         val e1e2 = e1.union(e2)
>>>>         val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>>>> e._2.filter(_==0).length>0).count.toDouble
>>>>       } yield (c/r) // get the retention rate
>>>>     } yield (dt1.toString->intres)
>>>>     Map(result:_*)
>>>>   }
>>>>
>>>> I am getting the following errors:
>>>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>>>> CountActor.scala:33
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>>>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>>>> CountActor.scala:33)
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>>>> map at CountActor.scala:32), which has no missing parents
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>>>> CountActor.scala:33
>>>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>>>> serializable: java.io.NotSerializableException:
>>>> com.github.ognenpv.pipeline.CountActor
>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>>     at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>>>     at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>     at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>     at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> I should mention that this code is fired off from an Akka actor (which
>>>> is controlled by a Scalatra servlet).
>>>>
>>>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>>>> principles in general, it is fair to say that at this point I am still
>>>> thinking from a point of view of an imperative programmer trying to fit a
>>>> square peg through a round hole ;)
>>>> Ognen
>>>>
>>>
>>>   --
>>> Some people, when confronted with a problem, think "I know, I'll use
>>> regular expressions." Now they have two problems.
>>> -- Jamie Zawinski
>>>
>>>
>>
>>

Re: Running actions in loops

Posted by Mayur Rustagi <ma...@gmail.com>.
So the whole function closure you want to apply on your RDD needs to be
serializable so that it can be "serialized" & sent to workers to operate on
RDD. So objects of jodatime cannot be serialized & sent hence jodatime is
out of work. 2 bad answers
1. initialize jodatime for each row & complete work & destroy them, that
way they are only intialized when job is running & need not be sent across.
2. Write your own parser & hope jodatime guys get their act together.

Regards


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
<og...@nengoiksvelzud.com>wrote:

>  Mayur, have not thought of that. Yes, I use jodatime. What is the scope
> that this serialization issue applies to? Only the method making a call
> into / using such a library? The whole class the method using such a
> library belongs to? Sorry if it is a dumb question :)
>
> Ognen
>
>
> On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
>
> Mostly the job you are executing is not serializable, this typically
> happens when you have a library that is not serializable.. are you using
> any library like jodatime etc ?
>
>  Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
>  @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <
> ognen@plainvanillagames.com> wrote:
>
>> It looks like the problem is in the filter task - is there anything
>> special about filter()?
>>
>> I have removed the filter line from the loops just to see if things will
>> work and they do.
>>
>> Anyone has any ideas?
>>
>> Thanks!
>> Ognen
>>
>>
>> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>>
>>> Hello,
>>>
>>> What is the general approach people take when trying to do analysis
>>> across multiple large files where the data to be extracted from a
>>> successive file depends on the data extracted from a previous file or set
>>> of files?
>>>
>>> For example:
>>> I have the following: a group of HDFS files each 20+GB in size. I need
>>> to extract event1 on day 1 from first file and extract event2 from all
>>> remaining files in a period of successive dates, then do a calculation on
>>> the two events.
>>> I then need to move on to day2, extract event1 (with certain
>>> properties), take all following days, extract event2 and run a calculation
>>> against previous day for all days in period. So on and so on.
>>>
>>> I have verified that the following (very naive approach doesn't work):
>>>
>>> def
>>> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
>>> = {
>>>     val epd = new PipelineDate(end)
>>>     val result = for {
>>>       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>>>       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>>>       val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>>> == event1).map(line =>
>>> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>>>       val c = e1.count.toDouble
>>>
>>>       val intres = for {
>>>         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>>>         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>>>         val e2 =
>>> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>>> event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
>>>         val e1e2 = e1.union(e2)
>>>         val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>>> e._2.filter(_==0).length>0).count.toDouble
>>>       } yield (c/r) // get the retention rate
>>>     } yield (dt1.toString->intres)
>>>     Map(result:_*)
>>>   }
>>>
>>> I am getting the following errors:
>>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>>> CountActor.scala:33
>>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>>> CountActor.scala:33)
>>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>>> map at CountActor.scala:32), which has no missing parents
>>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>>> CountActor.scala:33
>>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>>> serializable: java.io.NotSerializableException:
>>> com.github.ognenpv.pipeline.CountActor
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>     at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>>     at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>>     at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>     at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> I should mention that this code is fired off from an Akka actor (which
>>> is controlled by a Scalatra servlet).
>>>
>>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>>> principles in general, it is fair to say that at this point I am still
>>> thinking from a point of view of an imperative programmer trying to fit a
>>> square peg through a round hole ;)
>>> Ognen
>>>
>>
>>   --
>> Some people, when confronted with a problem, think "I know, I'll use
>> regular expressions." Now they have two problems.
>> -- Jamie Zawinski
>>
>>
>
>

Re: Running actions in loops

Posted by Ognen Duzlevski <og...@nengoiksvelzud.com>.
Mayur, have not thought of that. Yes, I use jodatime. What is the scope 
that this serialization issue applies to? Only the method making a call 
into / using such a library? The whole class the method using such a 
library belongs to? Sorry if it is a dumb question :)

Ognen

On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
> Mostly the job you are executing is not serializable, this typically 
> happens when you have a library that is not serializable.. are you 
> using any library like jodatime etc ?
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski 
> <ognen@plainvanillagames.com <ma...@plainvanillagames.com>> wrote:
>
>     It looks like the problem is in the filter task - is there
>     anything special about filter()?
>
>     I have removed the filter line from the loops just to see if
>     things will work and they do.
>
>     Anyone has any ideas?
>
>     Thanks!
>     Ognen
>
>
>     On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>
>         Hello,
>
>         What is the general approach people take when trying to do
>         analysis across multiple large files where the data to be
>         extracted from a successive file depends on the data extracted
>         from a previous file or set of files?
>
>         For example:
>         I have the following: a group of HDFS files each 20+GB in
>         size. I need to extract event1 on day 1 from first file and
>         extract event2 from all remaining files in a period of
>         successive dates, then do a calculation on the two events.
>         I then need to move on to day2, extract event1 (with certain
>         properties), take all following days, extract event2 and run a
>         calculation against previous day for all days in period. So on
>         and so on.
>
>         I have verified that the following (very naive approach
>         doesn't work):
>
>         def
>         calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
>         = {
>             val epd = new PipelineDate(end)
>             val result = for {
>               dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>               val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>               val e1 =
>         f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>         event1).map(line =>
>         (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>               val c = e1.count.toDouble
>
>               val intres = for {
>                 dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>                 val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>                 val e2 =
>         f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>         event2).map(line =>
>         (line.split(",")(2).split(":")(1).replace("\"",""),1))
>                 val e1e2 = e1.union(e2)
>                 val r = e1e2.groupByKey().filter(e => e._2.length > 1
>         && e._2.filter(_==0).length>0).count.toDouble
>               } yield (c/r) // get the retention rate
>             } yield (dt1.toString->intres)
>             Map(result:_*)
>           }
>
>         I am getting the following errors:
>         14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>         CountActor.scala:33
>         14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>         CountActor.scala:33) with 140 output partitions (allowLocal=false)
>         14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0
>         (count at CountActor.scala:33)
>         14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage:
>         List()
>         14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>         14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0
>         (MappedRDD[3] at map at CountActor.scala:32), which has no
>         missing parents
>         14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>         CountActor.scala:33
>         14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task
>         not serializable: java.io
>         <http://java.io>.NotSerializableException:
>         com.github.ognenpv.pipeline.CountActor
>         org.apache.spark.SparkException: Job aborted: Task not
>         serializable: java.io
>         <http://java.io>.NotSerializableException:
>         com.github.ognenpv.pipeline.CountActor
>             at
>         org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>             at
>         org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>             at
>         scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>             at
>         scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>             at org.apache.spark.scheduler.DAGScheduler.org
>         <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>             at org.apache.spark.scheduler.DAGScheduler.org
>         <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>             at org.apache.spark.scheduler.DAGScheduler.org
>         <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>             at
>         org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>             at
>         org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>             at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>             at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>             at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>             at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>             at
>         akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>             at
>         scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>             at
>         scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>             at
>         scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>             at
>         scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>         I should mention that this code is fired off from an Akka
>         actor (which is controlled by a Scalatra servlet).
>
>         Any ideas, recommendations etc.? I am fairly new to Scala and
>         M/R principles in general, it is fair to say that at this
>         point I am still thinking from a point of view of an
>         imperative programmer trying to fit a square peg through a
>         round hole ;)
>         Ognen
>
>
>     -- 
>     Some people, when confronted with a problem, think "I know, I'll
>     use regular expressions." Now they have two problems.
>     -- Jamie Zawinski
>
>


Re: Running actions in loops

Posted by Mayur Rustagi <ma...@gmail.com>.
Mostly the job you are executing is not serializable, this typically
happens when you have a library that is not serializable.. are you using
any library like jodatime etc ?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <ognen@plainvanillagames.com
> wrote:

> It looks like the problem is in the filter task - is there anything
> special about filter()?
>
> I have removed the filter line from the loops just to see if things will
> work and they do.
>
> Anyone has any ideas?
>
> Thanks!
> Ognen
>
>
> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>
>> Hello,
>>
>> What is the general approach people take when trying to do analysis
>> across multiple large files where the data to be extracted from a
>> successive file depends on the data extracted from a previous file or set
>> of files?
>>
>> For example:
>> I have the following: a group of HDFS files each 20+GB in size. I need to
>> extract event1 on day 1 from first file and extract event2 from all
>> remaining files in a period of successive dates, then do a calculation on
>> the two events.
>> I then need to move on to day2, extract event1 (with certain properties),
>> take all following days, extract event2 and run a calculation against
>> previous day for all days in period. So on and so on.
>>
>> I have verified that the following (very naive approach doesn't work):
>>
>> def calcSimpleRetention(start:String,end:String,event1:
>> String,event2:String):Map[String,List[Double]] = {
>>     val epd = new PipelineDate(end)
>>     val result = for {
>>       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>>       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>>       val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event1).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),0)).cache
>>       val c = e1.count.toDouble
>>
>>       val intres = for {
>>         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>>         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>>         val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event2).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),1))
>>         val e1e2 = e1.union(e2)
>>         val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>> e._2.filter(_==0).length>0).count.toDouble
>>       } yield (c/r) // get the retention rate
>>     } yield (dt1.toString->intres)
>>     Map(result:_*)
>>   }
>>
>> I am getting the following errors:
>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>> CountActor.scala:33
>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>> CountActor.scala:33)
>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>> map at CountActor.scala:32), which has no missing parents
>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>> CountActor.scala:33
>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>> serializable: java.io.NotSerializableException:
>> com.github.ognenpv.pipeline.CountActor
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
>> apache$spark$scheduler$DAGScheduler$$abortStage$1.
>> apply(DAGScheduler.scala:1028)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
>> apache$spark$scheduler$DAGScheduler$$abortStage$1.
>> apply(DAGScheduler.scala:1026)
>>     at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>     at org.apache.spark.scheduler.DAGScheduler.processEvent(
>> DAGScheduler.scala:569)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$
>> $anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>> AbstractDispatcher.scala:386)
>>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>> ForkJoinTask.java:260)
>>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>>
>> I should mention that this code is fired off from an Akka actor (which is
>> controlled by a Scalatra servlet).
>>
>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>> principles in general, it is fair to say that at this point I am still
>> thinking from a point of view of an imperative programmer trying to fit a
>> square peg through a round hole ;)
>> Ognen
>>
>
> --
> Some people, when confronted with a problem, think "I know, I'll use
> regular expressions." Now they have two problems.
> -- Jamie Zawinski
>
>

Re: Running actions in loops

Posted by Ognen Duzlevski <og...@plainvanillagames.com>.
It looks like the problem is in the filter task - is there anything 
special about filter()?

I have removed the filter line from the loops just to see if things will 
work and they do.

Anyone has any ideas?

Thanks!
Ognen

On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
> Hello,
>
> What is the general approach people take when trying to do analysis 
> across multiple large files where the data to be extracted from a 
> successive file depends on the data extracted from a previous file or 
> set of files?
>
> For example:
> I have the following: a group of HDFS files each 20+GB in size. I need 
> to extract event1 on day 1 from first file and extract event2 from all 
> remaining files in a period of successive dates, then do a calculation 
> on the two events.
> I then need to move on to day2, extract event1 (with certain 
> properties), take all following days, extract event2 and run a 
> calculation against previous day for all days in period. So on and so on.
>
> I have verified that the following (very naive approach doesn't work):
>
> def 
> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] 
> = {
>     val epd = new PipelineDate(end)
>     val result = for {
>       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>       val e1 = 
> f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == 
> event1).map(line => 
> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>       val c = e1.count.toDouble
>
>       val intres = for {
>         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>         val e2 = 
> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") == 
> event2).map(line => 
> (line.split(",")(2).split(":")(1).replace("\"",""),1))
>         val e1e2 = e1.union(e2)
>         val r = e1e2.groupByKey().filter(e => e._2.length > 1 && 
> e._2.filter(_==0).length>0).count.toDouble
>       } yield (c/r) // get the retention rate
>     } yield (dt1.toString->intres)
>     Map(result:_*)
>   }
>
> I am getting the following errors:
> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at 
> CountActor.scala:33
> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at 
> CountActor.scala:33) with 140 output partitions (allowLocal=false)
> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at 
> CountActor.scala:33)
> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] 
> at map at CountActor.scala:32), which has no missing parents
> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at 
> CountActor.scala:33
> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not 
> serializable: java.io.NotSerializableException: 
> com.github.ognenpv.pipeline.CountActor
> org.apache.spark.SparkException: Job aborted: Task not serializable: 
> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>     at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>     at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>     at 
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>     at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I should mention that this code is fired off from an Akka actor (which 
> is controlled by a Scalatra servlet).
>
> Any ideas, recommendations etc.? I am fairly new to Scala and M/R 
> principles in general, it is fair to say that at this point I am still 
> thinking from a point of view of an imperative programmer trying to 
> fit a square peg through a round hole ;)
> Ognen

-- 
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski