You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ognen Duzlevski <og...@nengoiksvelzud.com> on 2014/03/07 04:39:28 UTC
Running actions in loops
Hello,
What is the general approach people take when trying to do analysis
across multiple large files where the data to be extracted from a
successive file depends on the data extracted from a previous file or
set of files?
For example:
I have the following: a group of HDFS files each 20+GB in size. I need
to extract event1 on day 1 from first file and extract event2 from all
remaining files in a period of successive dates, then do a calculation
on the two events.
I then need to move on to day2, extract event1 (with certain
properties), take all following days, extract event2 and run a
calculation against previous day for all days in period. So on and so on.
I have verified that the following (very naive approach doesn't work):
def
calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
= {
val epd = new PipelineDate(end)
val result = for {
dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
val f1 = sc.textFile(dt1.toJsonHdfsFileName)
val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
== event1).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
val c = e1.count.toDouble
val intres = for {
dt2 <- PipelineDate.getPeriod(dt1+1,epd)
val f2 = sc.textFile(dt2.toJsonHdfsFileName)
val e2 =
f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
val e1e2 = e1.union(e2)
val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
e._2.filter(_==0).length>0).count.toDouble
} yield (c/r) // get the retention rate
} yield (dt1.toString->intres)
Map(result:_*)
}
I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at
CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
map at CountActor.scala:32), which has no missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
serializable: java.io.NotSerializableException:
com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I should mention that this code is fired off from an Akka actor (which
is controlled by a Scalatra servlet).
Any ideas, recommendations etc.? I am fairly new to Scala and M/R
principles in general, it is fair to say that at this point I am still
thinking from a point of view of an imperative programmer trying to fit
a square peg through a round hole ;)
Ognen
Re: Running actions in loops
Posted by Nick Pentreath <ni...@gmail.com>.
There is #3 which is use mapPartitions and init one jodatime obj per partition, which is less overhead for large objects—
Sent from Mailbox for iPhone
On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi <ma...@gmail.com>
wrote:
> So the whole function closure you want to apply on your RDD needs to be
> serializable so that it can be "serialized" & sent to workers to operate on
> RDD. So objects of jodatime cannot be serialized & sent hence jodatime is
> out of work. 2 bad answers
> 1. initialize jodatime for each row & complete work & destroy them, that
> way they are only intialized when job is running & need not be sent across.
> 2. Write your own parser & hope jodatime guys get their act together.
> Regards
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
> On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
> <og...@nengoiksvelzud.com>wrote:
>> Mayur, have not thought of that. Yes, I use jodatime. What is the scope
>> that this serialization issue applies to? Only the method making a call
>> into / using such a library? The whole class the method using such a
>> library belongs to? Sorry if it is a dumb question :)
>>
>> Ognen
>>
>>
>> On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
>>
>> Mostly the job you are executing is not serializable, this typically
>> happens when you have a library that is not serializable.. are you using
>> any library like jodatime etc ?
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <
>> ognen@plainvanillagames.com> wrote:
>>
>>> It looks like the problem is in the filter task - is there anything
>>> special about filter()?
>>>
>>> I have removed the filter line from the loops just to see if things will
>>> work and they do.
>>>
>>> Anyone has any ideas?
>>>
>>> Thanks!
>>> Ognen
>>>
>>>
>>> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>>>
>>>> Hello,
>>>>
>>>> What is the general approach people take when trying to do analysis
>>>> across multiple large files where the data to be extracted from a
>>>> successive file depends on the data extracted from a previous file or set
>>>> of files?
>>>>
>>>> For example:
>>>> I have the following: a group of HDFS files each 20+GB in size. I need
>>>> to extract event1 on day 1 from first file and extract event2 from all
>>>> remaining files in a period of successive dates, then do a calculation on
>>>> the two events.
>>>> I then need to move on to day2, extract event1 (with certain
>>>> properties), take all following days, extract event2 and run a calculation
>>>> against previous day for all days in period. So on and so on.
>>>>
>>>> I have verified that the following (very naive approach doesn't work):
>>>>
>>>> def
>>>> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
>>>> = {
>>>> val epd = new PipelineDate(end)
>>>> val result = for {
>>>> dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>>>> val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>>>> val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>>>> == event1).map(line =>
>>>> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>>>> val c = e1.count.toDouble
>>>>
>>>> val intres = for {
>>>> dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>>>> val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>>>> val e2 =
>>>> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>>>> event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
>>>> val e1e2 = e1.union(e2)
>>>> val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>>>> e._2.filter(_==0).length>0).count.toDouble
>>>> } yield (c/r) // get the retention rate
>>>> } yield (dt1.toString->intres)
>>>> Map(result:_*)
>>>> }
>>>>
>>>> I am getting the following errors:
>>>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>>>> CountActor.scala:33
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>>>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>>>> CountActor.scala:33)
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>>>> map at CountActor.scala:32), which has no missing parents
>>>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>>>> CountActor.scala:33
>>>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>>>> serializable: java.io.NotSerializableException:
>>>> com.github.ognenpv.pipeline.CountActor
>>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>>> at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> I should mention that this code is fired off from an Akka actor (which
>>>> is controlled by a Scalatra servlet).
>>>>
>>>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>>>> principles in general, it is fair to say that at this point I am still
>>>> thinking from a point of view of an imperative programmer trying to fit a
>>>> square peg through a round hole ;)
>>>> Ognen
>>>>
>>>
>>> --
>>> Some people, when confronted with a problem, think "I know, I'll use
>>> regular expressions." Now they have two problems.
>>> -- Jamie Zawinski
>>>
>>>
>>
>>
Re: Running actions in loops
Posted by Mayur Rustagi <ma...@gmail.com>.
So the whole function closure you want to apply on your RDD needs to be
serializable so that it can be "serialized" & sent to workers to operate on
RDD. So objects of jodatime cannot be serialized & sent hence jodatime is
out of work. 2 bad answers
1. initialize jodatime for each row & complete work & destroy them, that
way they are only intialized when job is running & need not be sent across.
2. Write your own parser & hope jodatime guys get their act together.
Regards
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>
On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski
<og...@nengoiksvelzud.com>wrote:
> Mayur, have not thought of that. Yes, I use jodatime. What is the scope
> that this serialization issue applies to? Only the method making a call
> into / using such a library? The whole class the method using such a
> library belongs to? Sorry if it is a dumb question :)
>
> Ognen
>
>
> On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
>
> Mostly the job you are executing is not serializable, this typically
> happens when you have a library that is not serializable.. are you using
> any library like jodatime etc ?
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <
> ognen@plainvanillagames.com> wrote:
>
>> It looks like the problem is in the filter task - is there anything
>> special about filter()?
>>
>> I have removed the filter line from the loops just to see if things will
>> work and they do.
>>
>> Anyone has any ideas?
>>
>> Thanks!
>> Ognen
>>
>>
>> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>>
>>> Hello,
>>>
>>> What is the general approach people take when trying to do analysis
>>> across multiple large files where the data to be extracted from a
>>> successive file depends on the data extracted from a previous file or set
>>> of files?
>>>
>>> For example:
>>> I have the following: a group of HDFS files each 20+GB in size. I need
>>> to extract event1 on day 1 from first file and extract event2 from all
>>> remaining files in a period of successive dates, then do a calculation on
>>> the two events.
>>> I then need to move on to day2, extract event1 (with certain
>>> properties), take all following days, extract event2 and run a calculation
>>> against previous day for all days in period. So on and so on.
>>>
>>> I have verified that the following (very naive approach doesn't work):
>>>
>>> def
>>> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
>>> = {
>>> val epd = new PipelineDate(end)
>>> val result = for {
>>> dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>>> val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>>> val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>>> == event1).map(line =>
>>> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>>> val c = e1.count.toDouble
>>>
>>> val intres = for {
>>> dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>>> val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>>> val e2 =
>>> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
>>> event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
>>> val e1e2 = e1.union(e2)
>>> val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>>> e._2.filter(_==0).length>0).count.toDouble
>>> } yield (c/r) // get the retention rate
>>> } yield (dt1.toString->intres)
>>> Map(result:_*)
>>> }
>>>
>>> I am getting the following errors:
>>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>>> CountActor.scala:33
>>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>>> CountActor.scala:33)
>>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>>> map at CountActor.scala:32), which has no missing parents
>>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>>> CountActor.scala:33
>>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>>> serializable: java.io.NotSerializableException:
>>> com.github.ognenpv.pipeline.CountActor
>>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> I should mention that this code is fired off from an Akka actor (which
>>> is controlled by a Scalatra servlet).
>>>
>>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>>> principles in general, it is fair to say that at this point I am still
>>> thinking from a point of view of an imperative programmer trying to fit a
>>> square peg through a round hole ;)
>>> Ognen
>>>
>>
>> --
>> Some people, when confronted with a problem, think "I know, I'll use
>> regular expressions." Now they have two problems.
>> -- Jamie Zawinski
>>
>>
>
>
Re: Running actions in loops
Posted by Ognen Duzlevski <og...@nengoiksvelzud.com>.
Mayur, have not thought of that. Yes, I use jodatime. What is the scope
that this serialization issue applies to? Only the method making a call
into / using such a library? The whole class the method using such a
library belongs to? Sorry if it is a dumb question :)
Ognen
On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
> Mostly the job you are executing is not serializable, this typically
> happens when you have a library that is not serializable.. are you
> using any library like jodatime etc ?
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski
> <ognen@plainvanillagames.com <ma...@plainvanillagames.com>> wrote:
>
> It looks like the problem is in the filter task - is there
> anything special about filter()?
>
> I have removed the filter line from the loops just to see if
> things will work and they do.
>
> Anyone has any ideas?
>
> Thanks!
> Ognen
>
>
> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>
> Hello,
>
> What is the general approach people take when trying to do
> analysis across multiple large files where the data to be
> extracted from a successive file depends on the data extracted
> from a previous file or set of files?
>
> For example:
> I have the following: a group of HDFS files each 20+GB in
> size. I need to extract event1 on day 1 from first file and
> extract event2 from all remaining files in a period of
> successive dates, then do a calculation on the two events.
> I then need to move on to day2, extract event1 (with certain
> properties), take all following days, extract event2 and run a
> calculation against previous day for all days in period. So on
> and so on.
>
> I have verified that the following (very naive approach
> doesn't work):
>
> def
> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
> = {
> val epd = new PipelineDate(end)
> val result = for {
> dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
> val f1 = sc.textFile(dt1.toJsonHdfsFileName)
> val e1 =
> f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
> event1).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
> val c = e1.count.toDouble
>
> val intres = for {
> dt2 <- PipelineDate.getPeriod(dt1+1,epd)
> val f2 = sc.textFile(dt2.toJsonHdfsFileName)
> val e2 =
> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
> event2).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),1))
> val e1e2 = e1.union(e2)
> val r = e1e2.groupByKey().filter(e => e._2.length > 1
> && e._2.filter(_==0).length>0).count.toDouble
> } yield (c/r) // get the retention rate
> } yield (dt1.toString->intres)
> Map(result:_*)
> }
>
> I am getting the following errors:
> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
> CountActor.scala:33
> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
> CountActor.scala:33) with 140 output partitions (allowLocal=false)
> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0
> (count at CountActor.scala:33)
> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage:
> List()
> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0
> (MappedRDD[3] at map at CountActor.scala:32), which has no
> missing parents
> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
> CountActor.scala:33
> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task
> not serializable: java.io
> <http://java.io>.NotSerializableException:
> com.github.ognenpv.pipeline.CountActor
> org.apache.spark.SparkException: Job aborted: Task not
> serializable: java.io
> <http://java.io>.NotSerializableException:
> com.github.ognenpv.pipeline.CountActor
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.org
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at org.apache.spark.scheduler.DAGScheduler.org
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
> at org.apache.spark.scheduler.DAGScheduler.org
> <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I should mention that this code is fired off from an Akka
> actor (which is controlled by a Scalatra servlet).
>
> Any ideas, recommendations etc.? I am fairly new to Scala and
> M/R principles in general, it is fair to say that at this
> point I am still thinking from a point of view of an
> imperative programmer trying to fit a square peg through a
> round hole ;)
> Ognen
>
>
> --
> Some people, when confronted with a problem, think "I know, I'll
> use regular expressions." Now they have two problems.
> -- Jamie Zawinski
>
>
Re: Running actions in loops
Posted by Mayur Rustagi <ma...@gmail.com>.
Mostly the job you are executing is not serializable, this typically
happens when you have a library that is not serializable.. are you using
any library like jodatime etc ?
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>
On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <ognen@plainvanillagames.com
> wrote:
> It looks like the problem is in the filter task - is there anything
> special about filter()?
>
> I have removed the filter line from the loops just to see if things will
> work and they do.
>
> Anyone has any ideas?
>
> Thanks!
> Ognen
>
>
> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>
>> Hello,
>>
>> What is the general approach people take when trying to do analysis
>> across multiple large files where the data to be extracted from a
>> successive file depends on the data extracted from a previous file or set
>> of files?
>>
>> For example:
>> I have the following: a group of HDFS files each 20+GB in size. I need to
>> extract event1 on day 1 from first file and extract event2 from all
>> remaining files in a period of successive dates, then do a calculation on
>> the two events.
>> I then need to move on to day2, extract event1 (with certain properties),
>> take all following days, extract event2 and run a calculation against
>> previous day for all days in period. So on and so on.
>>
>> I have verified that the following (very naive approach doesn't work):
>>
>> def calcSimpleRetention(start:String,end:String,event1:
>> String,event2:String):Map[String,List[Double]] = {
>> val epd = new PipelineDate(end)
>> val result = for {
>> dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>> val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>> val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event1).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),0)).cache
>> val c = e1.count.toDouble
>>
>> val intres = for {
>> dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>> val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>> val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event2).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),1))
>> val e1e2 = e1.union(e2)
>> val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>> e._2.filter(_==0).length>0).count.toDouble
>> } yield (c/r) // get the retention rate
>> } yield (dt1.toString->intres)
>> Map(result:_*)
>> }
>>
>> I am getting the following errors:
>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>> CountActor.scala:33
>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>> CountActor.scala:33)
>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>> map at CountActor.scala:32), which has no missing parents
>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>> CountActor.scala:33
>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>> serializable: java.io.NotSerializableException:
>> com.github.ognenpv.pipeline.CountActor
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
>> apache$spark$scheduler$DAGScheduler$$abortStage$1.
>> apply(DAGScheduler.scala:1028)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
>> apache$spark$scheduler$DAGScheduler$$abortStage$1.
>> apply(DAGScheduler.scala:1026)
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>> at org.apache.spark.scheduler.DAGScheduler.processEvent(
>> DAGScheduler.scala:569)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$
>> $anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>> AbstractDispatcher.scala:386)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>> ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>>
>> I should mention that this code is fired off from an Akka actor (which is
>> controlled by a Scalatra servlet).
>>
>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>> principles in general, it is fair to say that at this point I am still
>> thinking from a point of view of an imperative programmer trying to fit a
>> square peg through a round hole ;)
>> Ognen
>>
>
> --
> Some people, when confronted with a problem, think "I know, I'll use
> regular expressions." Now they have two problems.
> -- Jamie Zawinski
>
>
Re: Running actions in loops
Posted by Ognen Duzlevski <og...@plainvanillagames.com>.
It looks like the problem is in the filter task - is there anything
special about filter()?
I have removed the filter line from the loops just to see if things will
work and they do.
Anyone has any ideas?
Thanks!
Ognen
On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
> Hello,
>
> What is the general approach people take when trying to do analysis
> across multiple large files where the data to be extracted from a
> successive file depends on the data extracted from a previous file or
> set of files?
>
> For example:
> I have the following: a group of HDFS files each 20+GB in size. I need
> to extract event1 on day 1 from first file and extract event2 from all
> remaining files in a period of successive dates, then do a calculation
> on the two events.
> I then need to move on to day2, extract event1 (with certain
> properties), take all following days, extract event2 and run a
> calculation against previous day for all days in period. So on and so on.
>
> I have verified that the following (very naive approach doesn't work):
>
> def
> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
> = {
> val epd = new PipelineDate(end)
> val result = for {
> dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
> val f1 = sc.textFile(dt1.toJsonHdfsFileName)
> val e1 =
> f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
> event1).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
> val c = e1.count.toDouble
>
> val intres = for {
> dt2 <- PipelineDate.getPeriod(dt1+1,epd)
> val f2 = sc.textFile(dt2.toJsonHdfsFileName)
> val e2 =
> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
> event2).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),1))
> val e1e2 = e1.union(e2)
> val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
> e._2.filter(_==0).length>0).count.toDouble
> } yield (c/r) // get the retention rate
> } yield (dt1.toString->intres)
> Map(result:_*)
> }
>
> I am getting the following errors:
> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
> CountActor.scala:33
> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
> CountActor.scala:33) with 140 output partitions (allowLocal=false)
> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
> CountActor.scala:33)
> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3]
> at map at CountActor.scala:32), which has no missing parents
> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
> CountActor.scala:33
> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
> serializable: java.io.NotSerializableException:
> com.github.ognenpv.pipeline.CountActor
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I should mention that this code is fired off from an Akka actor (which
> is controlled by a Scalatra servlet).
>
> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
> principles in general, it is fair to say that at this point I am still
> thinking from a point of view of an imperative programmer trying to
> fit a square peg through a round hole ;)
> Ognen
--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski