You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lannyripple <la...@gmail.com> on 2014/03/27 05:34:31 UTC

Not getting it

Hi all,

I've got something which I think should be straightforward but it's not so
I'm not getting it.

I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g of
memory using 8 cores.

In HDFS I have a CSV file of 110M lines of 9 columns (e.g., [key,a,b,c...]). 
I have another file of 25K lines containing some number of keys which might
be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
something.  I'll get to that but this is toy problem that I'm using to get
some intuition with spark.)

Working on each file individually spark has no problem manipulating the
files.  If I try and join or union+filter though I can't seem to find the
join of the two files.  Code is along the lines of

val fileA =
sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}

And trying things like fileA.join(fileB) gives me heap OOM.  Trying

(fileA ++ fileB.map{case (k,v) => (k, Array(v))}).groupBy{_._1}.filter{case
(k, (_, xs)) => xs.exists{_.length == 1}

just causes spark to freeze.  (In all the cases I'm trying I just use a
final .count to force the results.)

I suspect I'm missing something fundamental about bringing the keyed data
together into the same partitions so it can be efficiently joined but I've
given up for now.  If anyone can shed some light (Beyond, "No really.  Use
shark.") on what I'm not understanding it would be most helpful.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Not getting it

Posted by lannyripple <la...@gmail.com>.
I've played around with it.  The CSV file looks like it gives 130
partitions.  I'm assuming that's the standard 64MB split size for HDFS
files.  I have increased number of partitions and number of tasks for
things like groupByKey and such.  Usually I start blowing up on GC
Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
shuffle = true,  into the mix thinking it would bring the keys into the
same partition. E.g.,

    (fileA ++ fileB.map{case (k,v) => (k,
Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
shuffle = true).groupBy...

(Which should effectively be imitating map-reduce) but I see GC Overlimit
when I do that.

I've got a stock install with num cores and worker memory set as mentioned
but even something like this

    fileA.sortByKey().map{_ => 1}.reduce{_ + _}

blows up with GC Overlimit (as did .count instead of the by-hand count).

    fileA.count

works.  It seems to be able to load the file as an RDD but not manipulate
it.




On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List] <
ml-node+s1001560n3417h16@n3.nabble.com> wrote:

> Have you tried setting the partitioning ?
>
> Best Regards,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
>
> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden email]<http://user/SendEmail.jtp?type=node&node=3417&i=0>
> > wrote:
>
>> Hi all,
>>
>> I've got something which I think should be straightforward but it's not so
>> I'm not getting it.
>>
>> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
>> of
>> memory using 8 cores.
>>
>> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
>> [key,a,b,c...]).
>> I have another file of 25K lines containing some number of keys which
>> might
>> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
>> something.  I'll get to that but this is toy problem that I'm using to get
>> some intuition with spark.)
>>
>> Working on each file individually spark has no problem manipulating the
>> files.  If I try and join or union+filter though I can't seem to find the
>> join of the two files.  Code is along the lines of
>>
>> val fileA =
>> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
>> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>>
>> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>>
>> (fileA ++ fileB.map{case (k,v) => (k,
>> Array(v))}).groupBy{_._1}.filter{case
>> (k, (_, xs)) => xs.exists{_.length == 1}
>>
>> just causes spark to freeze.  (In all the cases I'm trying I just use a
>> final .count to force the results.)
>>
>> I suspect I'm missing something fundamental about bringing the keyed data
>> together into the same partitions so it can be efficiently joined but I've
>> given up for now.  If anyone can shed some light (Beyond, "No really.  Use
>> shark.") on what I'm not understanding it would be most helpful.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>
>
> ------------------------------
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
>  To unsubscribe from Not getting it, click here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=3316&code=bGFubnkucmlwcGxlQGdtYWlsLmNvbXwzMzE2fDExMzI5OTY5Nzc=>
> .
> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3437.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Not getting it

Posted by lannyripple <la...@gmail.com>.
Ok.  Based on Sonal's message I dived more into memory and partitioning and
got it to work.

For the CSV file I used 1024 partitions [textFile(path, 1024)] which cut
the partition size down to 8MB (based on standard HDFS 64MB splits).  For
the key file I also adjusted partitions to use about 8MB.  This was still
blowing up with GC Overlimit and Heap OOM with join.  I then set SPARK_MEM
(which is hard to tease out of the documentation) to 4g and the join
completed.

Going back to find SPARK_MEM I found this the best explanation --
https://groups.google.com/forum/#!searchin/spark-users/SPARK_MEM/spark-users/ou6cJMlBj_M/NlBHYDjG_NYJ

At a guess setting SPARK_MEM did more than changing the partitions.
 Something to play around.


On Fri, Mar 28, 2014 at 10:17 AM, Lanny Ripple <la...@gmail.com>wrote:

> I've played around with it.  The CSV file looks like it gives 130
> partitions.  I'm assuming that's the standard 64MB split size for HDFS
> files.  I have increased number of partitions and number of tasks for
> things like groupByKey and such.  Usually I start blowing up on GC
> Overlimit or sometimes Heap OOM.  I recently tried throwing coalesce with
> shuffle = true,  into the mix thinking it would bring the keys into the
> same partition. E.g.,
>
>     (fileA ++ fileB.map{case (k,v) => (k,
> Array(v)}).coalesce(fileA.partitions.length + fileB.partitions.length,
> shuffle = true).groupBy...
>
> (Which should effectively be imitating map-reduce) but I see GC Overlimit
> when I do that.
>
> I've got a stock install with num cores and worker memory set as mentioned
> but even something like this
>
>     fileA.sortByKey().map{_ => 1}.reduce{_ + _}
>
> blows up with GC Overlimit (as did .count instead of the by-hand count).
>
>     fileA.count
>
> works.  It seems to be able to load the file as an RDD but not manipulate
> it.
>
>
>
>
> On Fri, Mar 28, 2014 at 3:04 AM, Sonal Goyal [via Apache Spark User List]
> <ml...@n3.nabble.com> wrote:
>
>> Have you tried setting the partitioning ?
>>
>> Best Regards,
>> Sonal
>> Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>>
>> On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <[hidden email]<http://user/SendEmail.jtp?type=node&node=3417&i=0>
>> > wrote:
>>
>>> Hi all,
>>>
>>> I've got something which I think should be straightforward but it's not
>>> so
>>> I'm not getting it.
>>>
>>> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have
>>> 16g of
>>> memory using 8 cores.
>>>
>>> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
>>> [key,a,b,c...]).
>>> I have another file of 25K lines containing some number of keys which
>>> might
>>> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
>>> something.  I'll get to that but this is toy problem that I'm using to
>>> get
>>> some intuition with spark.)
>>>
>>> Working on each file individually spark has no problem manipulating the
>>> files.  If I try and join or union+filter though I can't seem to find the
>>> join of the two files.  Code is along the lines of
>>>
>>> val fileA =
>>> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
>>> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>>>
>>> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>>>
>>> (fileA ++ fileB.map{case (k,v) => (k,
>>> Array(v))}).groupBy{_._1}.filter{case
>>> (k, (_, xs)) => xs.exists{_.length == 1}
>>>
>>> just causes spark to freeze.  (In all the cases I'm trying I just use a
>>> final .count to force the results.)
>>>
>>> I suspect I'm missing something fundamental about bringing the keyed data
>>> together into the same partitions so it can be efficiently joined but
>>> I've
>>> given up for now.  If anyone can shed some light (Beyond, "No really.
>>>  Use
>>> shark.") on what I'm not understanding it would be most helpful.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3417.html
>>  To unsubscribe from Not getting it, click here<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=3316&code=bGFubnkucmlwcGxlQGdtYWlsLmNvbXwzMzE2fDExMzI5OTY5Nzc=>
>> .
>> NAML<http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316p3438.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Not getting it

Posted by Sonal Goyal <so...@gmail.com>.
Have you tried setting the partitioning ?

Best Regards,
Sonal
Nube Technologies <http://www.nubetech.co>

<http://in.linkedin.com/in/sonalgoyal>




On Thu, Mar 27, 2014 at 10:04 AM, lannyripple <la...@gmail.com>wrote:

> Hi all,
>
> I've got something which I think should be straightforward but it's not so
> I'm not getting it.
>
> I have an 8 node spark 0.9.0 cluster also running HDFS.  Workers have 16g
> of
> memory using 8 cores.
>
> In HDFS I have a CSV file of 110M lines of 9 columns (e.g.,
> [key,a,b,c...]).
> I have another file of 25K lines containing some number of keys which might
> be in my CSV file.  (Yes, I know I should use an RDBMS or shark or
> something.  I'll get to that but this is toy problem that I'm using to get
> some intuition with spark.)
>
> Working on each file individually spark has no problem manipulating the
> files.  If I try and join or union+filter though I can't seem to find the
> join of the two files.  Code is along the lines of
>
> val fileA =
> sc.textFile("hdfs://.../fileA_110M.csv").map{_.split(",")}.keyBy{_(0)}
> val fileB = sc.textFile("hdfs://.../fileB_25k.csv").keyBy{x => x}
>
> And trying things like fileA.join(fileB) gives me heap OOM.  Trying
>
> (fileA ++ fileB.map{case (k,v) => (k, Array(v))}).groupBy{_._1}.filter{case
> (k, (_, xs)) => xs.exists{_.length == 1}
>
> just causes spark to freeze.  (In all the cases I'm trying I just use a
> final .count to force the results.)
>
> I suspect I'm missing something fundamental about bringing the keyed data
> together into the same partitions so it can be efficiently joined but I've
> given up for now.  If anyone can shed some light (Beyond, "No really.  Use
> shark.") on what I'm not understanding it would be most helpful.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Not-getting-it-tp3316.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>