You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Timothy Perrigo <tp...@gmail.com> on 2013/10/21 22:05:38 UTC

Help with Initial Cluster Configuration / Tuning

Hi everyone,
I am very new to Spark, so as a learning exercise I've set up a small
cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
I'm hoping to use to calculate ngram frequencies from text files of various
sizes (I'm not doing anything with them; I just thought this would be
slightly more interesting than the usual 'word count' example).  Currently,
I'm trying to work with a 1GB text file, but running into memory issues.
 I'm wondering what parameters I should be setting (in spark-env.sh) in
order to properly utilize the cluster.  Right now, I'd be happy just to
have the process complete successfully with the 1 gig file, so I'd really
appreciate any suggestions you all might have.

Here's a summary of the code I'm running through the spark shell on the
master:

def ngrams(s: String, n: Int = 3): Seq[String] = {
  (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
")).map(_.trim).toList
}

val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")

val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))

So far so good; the problems come during the reduce phase.  With small
files, I was able to issue the following to calculate the most frequently
occurring trigram:

val topNgram = (mapped countByValue) reduce((a:(String, Long), b:(String,
Long)) => if (a._2 > b._2) a else b)

With the 1 gig file, though, I've been running into OutOfMemory errors, so
I decided to split the reduction to several steps, starting with simply
issuing countByValue of my "mapped" RDD, but I have yet to get it to
complete successfully.

SPARK_MEM is currently set to 6154m.  I also bumped up the
spark.akka.framesize setting to 500 (though at this point, I was grasping
at straws; I'm not sure what a "proper" value would be).  What properties
should I be setting for a job of this size on a cluster of 3 m1.large
slaves? (The cluster was initially configured using the spark-ec2 scripts).
 Also, programmatically, what should I be doing differently?  (For example,
should I be setting the minimum number of splits when reading the text
file?  If so, what would be a good default?).

I apologize for what I'm sure are very naive questions.  I think Spark is a
fantastic project and have enjoyed working with it, but I'm still very much
a newbie and would appreciate any help you all can provide (as well as any
'rules-of-thumb' or best practices I should be following).

Thanks,
Tim Perrigo

Re: Help with Initial Cluster Configuration / Tuning

Posted by Matei Zaharia <ma...@gmail.com>.
Hi Shay and Timothy,

We're very aware of this issue, and we want to improve both the documentation and the out-of-the-box behavior for these cases. Right now the closest thing is the tuning guide here: http://spark.incubator.apache.org/docs/latest/tuning.html, but it's just a small step in this direction. But basically because these issues are quite common, the goal should be to eliminate them completely.

Matei

On Oct 22, 2013, at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:

> Hi Matei,
> 
> I've seen several memory tuning queries on this mailing list, and also heard the same kinds of queries at the spark meetup. In fact the last bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory tuning is still a mystery".
> 
> I certainly had lots of issues in when I first started. From memory issues to gc issues, things seem to run fine until you try something with 500GB of data etc.
> 
> I was wondering if you could write up a little white paper or some guide lines on how to set memory values, and what to look at when something goes wrong? Eg. I would never gave guessed that countByValue happens on a single machine etc.
> 
> On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com> wrote:
> Hi there,
> 
> The problem is that countByValue happens in only a single reduce task -- this is probably something we should fix but it's basically not designed for lots of values. Instead, do the count in parallel as follows:
> 
> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
> 
> If this still has trouble, you can also increase the level of parallelism of reduceByKey by passing it a second parameter for the number of tasks (e.g. 100).
> 
> BTW one other small thing with your code, flatMap should actually work fine if your function returns an Iterator to Traversable, so there's no need to call toList and return a Seq in ngrams; you can just return an Iterator[String].
> 
> Matei
> 
> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com> wrote:
> 
> > Hi everyone,
> > I am very new to Spark, so as a learning exercise I've set up a small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which I'm hoping to use to calculate ngram frequencies from text files of various sizes (I'm not doing anything with them; I just thought this would be slightly more interesting than the usual 'word count' example).  Currently, I'm trying to work with a 1GB text file, but running into memory issues.  I'm wondering what parameters I should be setting (in spark-env.sh) in order to properly utilize the cluster.  Right now, I'd be happy just to have the process complete successfully with the 1 gig file, so I'd really appreciate any suggestions you all might have.
> >
> > Here's a summary of the code I'm running through the spark shell on the master:
> >
> > def ngrams(s: String, n: Int = 3): Seq[String] = {
> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString(" ")).map(_.trim).toList
> > }
> >
> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
> >
> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
> >
> > So far so good; the problems come during the reduce phase.  With small files, I was able to issue the following to calculate the most frequently occurring trigram:
> >
> > val topNgram = (mapped countByValue) reduce((a:(String, Long), b:(String, Long)) => if (a._2 > b._2) a else b)
> >
> > With the 1 gig file, though, I've been running into OutOfMemory errors, so I decided to split the reduction to several steps, starting with simply issuing countByValue of my "mapped" RDD, but I have yet to get it to complete successfully.
> >
> > SPARK_MEM is currently set to 6154m.  I also bumped up the spark.akka.framesize setting to 500 (though at this point, I was grasping at straws; I'm not sure what a "proper" value would be).  What properties should I be setting for a job of this size on a cluster of 3 m1.large slaves? (The cluster was initially configured using the spark-ec2 scripts).  Also, programmatically, what should I be doing differently?  (For example, should I be setting the minimum number of splits when reading the text file?  If so, what would be a good default?).
> >
> > I apologize for what I'm sure are very naive questions.  I think Spark is a fantastic project and have enjoyed working with it, but I'm still very much a newbie and would appreciate any help you all can provide (as well as any 'rules-of-thumb' or best practices I should be following).
> >
> > Thanks,
> > Tim Perrigo
> 


Re: Help with Initial Cluster Configuration / Tuning

Posted by Mark Hamstra <ma...@clearstorydata.com>.
Yes, that is certainly feasible.  But Chapter
11<http://shop.oreilly.com/product/0636920028512.do>isn't written yet.


On Tue, Oct 22, 2013 at 11:02 AM, Timothy Perrigo <tp...@gmail.com>wrote:

> As the newbie who started the conversation, I'd like to thank everyone for
> the feedback and the subsequent discussion.  I certainly understand the
> point that there's no magic rule book that can take the place of learning
> the ins-and-outs of distributed / cluster computing-- a certain amount of
> pain is to be expected.  I'd like to add, too, that so far, with Spark,
> this pain has been surprisingly minimal, thanks in no small part to the
> information I've gleaned (directly or indirectly) from this mailing list.
>
> However, any additional information is always welcome.  In my own case,
> what I think I would really benefit from would be a start-to-finish example
> of a problem that works on a large-ish dataset.  In particular, it would be
> helpful to know what parameters have to be considered, what they are set
> to, and the rationale behind how those values were obtained, as well as a
> discussion about determining a "good" cluster size / configuration for the
> example problem.  (In fact, if anyone knows of such an example, I would be
> very appreciative!).  This certainly won't make everything completely
> painless, but would be invaluable and certainly seems feasible.
>
> Thanks again everyone for you help and advice.
>
> Tim
>
>
> On Tue, Oct 22, 2013 at 12:01 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>
>> Yes, there are certainly rough spots and sharp edges that we can work at
>> polishing out and rounding over; and there are people working on such
>> things.  Don't get me wrong, feedback from users about what they are
>> finding to difficult, opaque or impenetrable is useful; but I don't think
>> that the expectation that working with a framework like Spark should be
>> smooth and easy can be completely met.  Even when all of the documentation,
>> guidance, instrumentation and user interface are in place, there will still
>> be a lot for users to come to terms with.
>>
>>
>> On Tue, Oct 22, 2013 at 9:50 AM, Aaron Davidson <il...@gmail.com>wrote:
>>
>>> On the other hand, I totally agree that memory usage in Spark is rather
>>> opaque, and is one area where we could do a lot better at in terms of
>>> communicating issues, through both docs and instrumentation. At least with
>>> serialization and such, you can get meaningful exceptions (hopefully), but
>>> OOMs are just blanket "something wasn't right somewhere." Debugging them
>>> empirically would require deep diving into Spark's heap allocations, which
>>> requires a lot more knowledge of Spark internals than should be required
>>> for general usage.
>>>
>>>
>>> On Tue, Oct 22, 2013 at 9:22 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>>
>>>> Yes, but that also illustrates the problem faced by anyone trying to
>>>> write a "little white paper or guide lines" to make newbies' experience
>>>> painless.  Distributed computing clusters are necessarily complex things,
>>>> and problems can crop up in multiple locations, layers or subsystems.  It's
>>>> just not feasible to quickly bring up to speed someone with no experience
>>>> in distributed programming and cluster systems.  It takes a lot of
>>>> knowledge, both broad and deep.  Very few people have the complete scope of
>>>> knowledge and experience required, so creating, debugging and maintaining a
>>>> cluster computing application almost always has to be a team effort.
>>>>
>>>> Support organizations and communities can replace some of the need for
>>>> a knowledgeable and well-functioning team, but not all of it; and at some
>>>> point you have to expect that debugging is going to take a considerable
>>>> amount of painstaking, systematic effort -- including a close reading of
>>>> the available docs.
>>>>
>>>> Several people are working on making more and better reference and
>>>> training material available, and some of that will include trouble-shooting
>>>> guidance, but that doesn't mean that there can ever be "one little paper"
>>>> to solve newbies' (or more experienced developers') problems or provide
>>>> adequate guidance.  There's just too much to cover and too many different
>>>> kinds or levels of initial-user knowledge to make that completely feasible.
>>>>
>>>>
>>>>
>>>> On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>>
>>>>> Hey Mark, I didn't mean to say that the information isn't out there --
>>>>> just that when something goes wrong with spark, the scope of what could be
>>>>> wrong is so large - some bad setting with JVM, serializer, akka, badly
>>>>> written scala code, algorithm wrong, check worker logs, check executor
>>>>> stderrs, ....
>>>>>
>>>>> When I looked at this post this morning, my initial thought wasn't
>>>>> that "countByValue" would be at fault. ...probably since I've only been
>>>>> using Scala/Spark for a month or so.
>>>>>
>>>>> It was just a suggestion to help newbies come up to speed more quickly
>>>>> and gain insights into how to debug issues.
>>>>>
>>>>>
>>>>> On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <mark@clearstorydata.com
>>>>> > wrote:
>>>>>
>>>>>> There's no need to guess at that.  The docs tell you directly:
>>>>>>
>>>>>> def countByValue(): Map[T, Long]
>>>>>>
>>>>>> Return the count of each unique value in this RDD as a map of (value,
>>>>>> count) pairs. The final combine step happens locally on the master,
>>>>>> equivalent to running a single reduce task.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>>>>
>>>>>>> Hi Matei,
>>>>>>>
>>>>>>> I've seen several memory tuning queries on this mailing list, and
>>>>>>> also heard the same kinds of queries at the spark meetup. In fact the last
>>>>>>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>>>>>>> tuning is still a mystery".
>>>>>>>
>>>>>>> I certainly had lots of issues in when I first started. From memory
>>>>>>> issues to gc issues, things seem to run fine until you try something with
>>>>>>> 500GB of data etc.
>>>>>>>
>>>>>>> I was wondering if you could write up a little white paper or some
>>>>>>> guide lines on how to set memory values, and what to look at when something
>>>>>>> goes wrong? Eg. I would never gave guessed that countByValue happens on a
>>>>>>> single machine etc.
>>>>>>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi there,
>>>>>>>>
>>>>>>>> The problem is that countByValue happens in only a single reduce
>>>>>>>> task -- this is probably something we should fix but it's basically not
>>>>>>>> designed for lots of values. Instead, do the count in parallel as follows:
>>>>>>>>
>>>>>>>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a +
>>>>>>>> b)
>>>>>>>>
>>>>>>>> If this still has trouble, you can also increase the level of
>>>>>>>> parallelism of reduceByKey by passing it a second parameter for the number
>>>>>>>> of tasks (e.g. 100).
>>>>>>>>
>>>>>>>> BTW one other small thing with your code, flatMap should actually
>>>>>>>> work fine if your function returns an Iterator to Traversable, so there's
>>>>>>>> no need to call toList and return a Seq in ngrams; you can just return an
>>>>>>>> Iterator[String].
>>>>>>>>
>>>>>>>> Matei
>>>>>>>>
>>>>>>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > Hi everyone,
>>>>>>>> > I am very new to Spark, so as a learning exercise I've set up a
>>>>>>>> small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves),
>>>>>>>> which I'm hoping to use to calculate ngram frequencies from text files of
>>>>>>>> various sizes (I'm not doing anything with them; I just thought this would
>>>>>>>> be slightly more interesting than the usual 'word count' example).
>>>>>>>>  Currently, I'm trying to work with a 1GB text file, but running into
>>>>>>>> memory issues.  I'm wondering what parameters I should be setting (in
>>>>>>>> spark-env.sh) in order to properly utilize the cluster.  Right now, I'd be
>>>>>>>> happy just to have the process complete successfully with the 1 gig file,
>>>>>>>> so I'd really appreciate any suggestions you all might have.
>>>>>>>> >
>>>>>>>> > Here's a summary of the code I'm running through the spark shell
>>>>>>>> on the master:
>>>>>>>> >
>>>>>>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>>>>>>> >   (s.split("\\s+").sliding(n)).filter(_.length ==
>>>>>>>> n).map(_.mkString(" ")).map(_.trim).toList
>>>>>>>> > }
>>>>>>>> >
>>>>>>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>>>>>>> >
>>>>>>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>>>>>>> >
>>>>>>>> > So far so good; the problems come during the reduce phase.  With
>>>>>>>> small files, I was able to issue the following to calculate the most
>>>>>>>> frequently occurring trigram:
>>>>>>>> >
>>>>>>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>>>>>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>>>>>>> >
>>>>>>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>>>>>>> errors, so I decided to split the reduction to several steps, starting with
>>>>>>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to
>>>>>>>> complete successfully.
>>>>>>>> >
>>>>>>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>>>>>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>>>>>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>>>>>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>>>>>>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>>>>>>>  Also, programmatically, what should I be doing differently?  (For example,
>>>>>>>> should I be setting the minimum number of splits when reading the text
>>>>>>>> file?  If so, what would be a good default?).
>>>>>>>> >
>>>>>>>> > I apologize for what I'm sure are very naive questions.  I think
>>>>>>>> Spark is a fantastic project and have enjoyed working with it, but I'm
>>>>>>>> still very much a newbie and would appreciate any help you all can provide
>>>>>>>> (as well as any 'rules-of-thumb' or best practices I should be following).
>>>>>>>> >
>>>>>>>> > Thanks,
>>>>>>>> > Tim Perrigo
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Timothy Perrigo <tp...@gmail.com>.
As the newbie who started the conversation, I'd like to thank everyone for
the feedback and the subsequent discussion.  I certainly understand the
point that there's no magic rule book that can take the place of learning
the ins-and-outs of distributed / cluster computing-- a certain amount of
pain is to be expected.  I'd like to add, too, that so far, with Spark,
this pain has been surprisingly minimal, thanks in no small part to the
information I've gleaned (directly or indirectly) from this mailing list.

However, any additional information is always welcome.  In my own case,
what I think I would really benefit from would be a start-to-finish example
of a problem that works on a large-ish dataset.  In particular, it would be
helpful to know what parameters have to be considered, what they are set
to, and the rationale behind how those values were obtained, as well as a
discussion about determining a "good" cluster size / configuration for the
example problem.  (In fact, if anyone knows of such an example, I would be
very appreciative!).  This certainly won't make everything completely
painless, but would be invaluable and certainly seems feasible.

Thanks again everyone for you help and advice.

Tim


On Tue, Oct 22, 2013 at 12:01 PM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> Yes, there are certainly rough spots and sharp edges that we can work at
> polishing out and rounding over; and there are people working on such
> things.  Don't get me wrong, feedback from users about what they are
> finding to difficult, opaque or impenetrable is useful; but I don't think
> that the expectation that working with a framework like Spark should be
> smooth and easy can be completely met.  Even when all of the documentation,
> guidance, instrumentation and user interface are in place, there will still
> be a lot for users to come to terms with.
>
>
> On Tue, Oct 22, 2013 at 9:50 AM, Aaron Davidson <il...@gmail.com>wrote:
>
>> On the other hand, I totally agree that memory usage in Spark is rather
>> opaque, and is one area where we could do a lot better at in terms of
>> communicating issues, through both docs and instrumentation. At least with
>> serialization and such, you can get meaningful exceptions (hopefully), but
>> OOMs are just blanket "something wasn't right somewhere." Debugging them
>> empirically would require deep diving into Spark's heap allocations, which
>> requires a lot more knowledge of Spark internals than should be required
>> for general usage.
>>
>>
>> On Tue, Oct 22, 2013 at 9:22 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>
>>> Yes, but that also illustrates the problem faced by anyone trying to
>>> write a "little white paper or guide lines" to make newbies' experience
>>> painless.  Distributed computing clusters are necessarily complex things,
>>> and problems can crop up in multiple locations, layers or subsystems.  It's
>>> just not feasible to quickly bring up to speed someone with no experience
>>> in distributed programming and cluster systems.  It takes a lot of
>>> knowledge, both broad and deep.  Very few people have the complete scope of
>>> knowledge and experience required, so creating, debugging and maintaining a
>>> cluster computing application almost always has to be a team effort.
>>>
>>> Support organizations and communities can replace some of the need for a
>>> knowledgeable and well-functioning team, but not all of it; and at some
>>> point you have to expect that debugging is going to take a considerable
>>> amount of painstaking, systematic effort -- including a close reading of
>>> the available docs.
>>>
>>> Several people are working on making more and better reference and
>>> training material available, and some of that will include trouble-shooting
>>> guidance, but that doesn't mean that there can ever be "one little paper"
>>> to solve newbies' (or more experienced developers') problems or provide
>>> adequate guidance.  There's just too much to cover and too many different
>>> kinds or levels of initial-user knowledge to make that completely feasible.
>>>
>>>
>>>
>>> On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>
>>>> Hey Mark, I didn't mean to say that the information isn't out there --
>>>> just that when something goes wrong with spark, the scope of what could be
>>>> wrong is so large - some bad setting with JVM, serializer, akka, badly
>>>> written scala code, algorithm wrong, check worker logs, check executor
>>>> stderrs, ....
>>>>
>>>> When I looked at this post this morning, my initial thought wasn't that
>>>> "countByValue" would be at fault. ...probably since I've only been using
>>>> Scala/Spark for a month or so.
>>>>
>>>> It was just a suggestion to help newbies come up to speed more quickly
>>>> and gain insights into how to debug issues.
>>>>
>>>>
>>>> On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>>>
>>>>> There's no need to guess at that.  The docs tell you directly:
>>>>>
>>>>> def countByValue(): Map[T, Long]
>>>>>
>>>>> Return the count of each unique value in this RDD as a map of (value,
>>>>> count) pairs. The final combine step happens locally on the master,
>>>>> equivalent to running a single reduce task.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>>>
>>>>>> Hi Matei,
>>>>>>
>>>>>> I've seen several memory tuning queries on this mailing list, and
>>>>>> also heard the same kinds of queries at the spark meetup. In fact the last
>>>>>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>>>>>> tuning is still a mystery".
>>>>>>
>>>>>> I certainly had lots of issues in when I first started. From memory
>>>>>> issues to gc issues, things seem to run fine until you try something with
>>>>>> 500GB of data etc.
>>>>>>
>>>>>> I was wondering if you could write up a little white paper or some
>>>>>> guide lines on how to set memory values, and what to look at when something
>>>>>> goes wrong? Eg. I would never gave guessed that countByValue happens on a
>>>>>> single machine etc.
>>>>>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> The problem is that countByValue happens in only a single reduce
>>>>>>> task -- this is probably something we should fix but it's basically not
>>>>>>> designed for lots of values. Instead, do the count in parallel as follows:
>>>>>>>
>>>>>>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>>>>>>
>>>>>>> If this still has trouble, you can also increase the level of
>>>>>>> parallelism of reduceByKey by passing it a second parameter for the number
>>>>>>> of tasks (e.g. 100).
>>>>>>>
>>>>>>> BTW one other small thing with your code, flatMap should actually
>>>>>>> work fine if your function returns an Iterator to Traversable, so there's
>>>>>>> no need to call toList and return a Seq in ngrams; you can just return an
>>>>>>> Iterator[String].
>>>>>>>
>>>>>>> Matei
>>>>>>>
>>>>>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > Hi everyone,
>>>>>>> > I am very new to Spark, so as a learning exercise I've set up a
>>>>>>> small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves),
>>>>>>> which I'm hoping to use to calculate ngram frequencies from text files of
>>>>>>> various sizes (I'm not doing anything with them; I just thought this would
>>>>>>> be slightly more interesting than the usual 'word count' example).
>>>>>>>  Currently, I'm trying to work with a 1GB text file, but running into
>>>>>>> memory issues.  I'm wondering what parameters I should be setting (in
>>>>>>> spark-env.sh) in order to properly utilize the cluster.  Right now, I'd be
>>>>>>> happy just to have the process complete successfully with the 1 gig file,
>>>>>>> so I'd really appreciate any suggestions you all might have.
>>>>>>> >
>>>>>>> > Here's a summary of the code I'm running through the spark shell
>>>>>>> on the master:
>>>>>>> >
>>>>>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>>>>>> >   (s.split("\\s+").sliding(n)).filter(_.length ==
>>>>>>> n).map(_.mkString(" ")).map(_.trim).toList
>>>>>>> > }
>>>>>>> >
>>>>>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>>>>>> >
>>>>>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>>>>>> >
>>>>>>> > So far so good; the problems come during the reduce phase.  With
>>>>>>> small files, I was able to issue the following to calculate the most
>>>>>>> frequently occurring trigram:
>>>>>>> >
>>>>>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>>>>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>>>>>> >
>>>>>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>>>>>> errors, so I decided to split the reduction to several steps, starting with
>>>>>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to
>>>>>>> complete successfully.
>>>>>>> >
>>>>>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>>>>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>>>>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>>>>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>>>>>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>>>>>>  Also, programmatically, what should I be doing differently?  (For example,
>>>>>>> should I be setting the minimum number of splits when reading the text
>>>>>>> file?  If so, what would be a good default?).
>>>>>>> >
>>>>>>> > I apologize for what I'm sure are very naive questions.  I think
>>>>>>> Spark is a fantastic project and have enjoyed working with it, but I'm
>>>>>>> still very much a newbie and would appreciate any help you all can provide
>>>>>>> (as well as any 'rules-of-thumb' or best practices I should be following).
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Tim Perrigo
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Mark Hamstra <ma...@clearstorydata.com>.
Yes, there are certainly rough spots and sharp edges that we can work at
polishing out and rounding over; and there are people working on such
things.  Don't get me wrong, feedback from users about what they are
finding to difficult, opaque or impenetrable is useful; but I don't think
that the expectation that working with a framework like Spark should be
smooth and easy can be completely met.  Even when all of the documentation,
guidance, instrumentation and user interface are in place, there will still
be a lot for users to come to terms with.


On Tue, Oct 22, 2013 at 9:50 AM, Aaron Davidson <il...@gmail.com> wrote:

> On the other hand, I totally agree that memory usage in Spark is rather
> opaque, and is one area where we could do a lot better at in terms of
> communicating issues, through both docs and instrumentation. At least with
> serialization and such, you can get meaningful exceptions (hopefully), but
> OOMs are just blanket "something wasn't right somewhere." Debugging them
> empirically would require deep diving into Spark's heap allocations, which
> requires a lot more knowledge of Spark internals than should be required
> for general usage.
>
>
> On Tue, Oct 22, 2013 at 9:22 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>
>> Yes, but that also illustrates the problem faced by anyone trying to
>> write a "little white paper or guide lines" to make newbies' experience
>> painless.  Distributed computing clusters are necessarily complex things,
>> and problems can crop up in multiple locations, layers or subsystems.  It's
>> just not feasible to quickly bring up to speed someone with no experience
>> in distributed programming and cluster systems.  It takes a lot of
>> knowledge, both broad and deep.  Very few people have the complete scope of
>> knowledge and experience required, so creating, debugging and maintaining a
>> cluster computing application almost always has to be a team effort.
>>
>> Support organizations and communities can replace some of the need for a
>> knowledgeable and well-functioning team, but not all of it; and at some
>> point you have to expect that debugging is going to take a considerable
>> amount of painstaking, systematic effort -- including a close reading of
>> the available docs.
>>
>> Several people are working on making more and better reference and
>> training material available, and some of that will include trouble-shooting
>> guidance, but that doesn't mean that there can ever be "one little paper"
>> to solve newbies' (or more experienced developers') problems or provide
>> adequate guidance.  There's just too much to cover and too many different
>> kinds or levels of initial-user knowledge to make that completely feasible.
>>
>>
>>
>> On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng <sh...@1618labs.com> wrote:
>>
>>> Hey Mark, I didn't mean to say that the information isn't out there --
>>> just that when something goes wrong with spark, the scope of what could be
>>> wrong is so large - some bad setting with JVM, serializer, akka, badly
>>> written scala code, algorithm wrong, check worker logs, check executor
>>> stderrs, ....
>>>
>>> When I looked at this post this morning, my initial thought wasn't that
>>> "countByValue" would be at fault. ...probably since I've only been using
>>> Scala/Spark for a month or so.
>>>
>>> It was just a suggestion to help newbies come up to speed more quickly
>>> and gain insights into how to debug issues.
>>>
>>>
>>> On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>>
>>>> There's no need to guess at that.  The docs tell you directly:
>>>>
>>>> def countByValue(): Map[T, Long]
>>>>
>>>> Return the count of each unique value in this RDD as a map of (value,
>>>> count) pairs. The final combine step happens locally on the master,
>>>> equivalent to running a single reduce task.
>>>>
>>>>
>>>>
>>>> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>>
>>>>> Hi Matei,
>>>>>
>>>>> I've seen several memory tuning queries on this mailing list, and also
>>>>> heard the same kinds of queries at the spark meetup. In fact the last
>>>>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>>>>> tuning is still a mystery".
>>>>>
>>>>> I certainly had lots of issues in when I first started. From memory
>>>>> issues to gc issues, things seem to run fine until you try something with
>>>>> 500GB of data etc.
>>>>>
>>>>> I was wondering if you could write up a little white paper or some
>>>>> guide lines on how to set memory values, and what to look at when something
>>>>> goes wrong? Eg. I would never gave guessed that countByValue happens on a
>>>>> single machine etc.
>>>>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> The problem is that countByValue happens in only a single reduce task
>>>>>> -- this is probably something we should fix but it's basically not designed
>>>>>> for lots of values. Instead, do the count in parallel as follows:
>>>>>>
>>>>>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>>>>>
>>>>>> If this still has trouble, you can also increase the level of
>>>>>> parallelism of reduceByKey by passing it a second parameter for the number
>>>>>> of tasks (e.g. 100).
>>>>>>
>>>>>> BTW one other small thing with your code, flatMap should actually
>>>>>> work fine if your function returns an Iterator to Traversable, so there's
>>>>>> no need to call toList and return a Seq in ngrams; you can just return an
>>>>>> Iterator[String].
>>>>>>
>>>>>> Matei
>>>>>>
>>>>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi everyone,
>>>>>> > I am very new to Spark, so as a learning exercise I've set up a
>>>>>> small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves),
>>>>>> which I'm hoping to use to calculate ngram frequencies from text files of
>>>>>> various sizes (I'm not doing anything with them; I just thought this would
>>>>>> be slightly more interesting than the usual 'word count' example).
>>>>>>  Currently, I'm trying to work with a 1GB text file, but running into
>>>>>> memory issues.  I'm wondering what parameters I should be setting (in
>>>>>> spark-env.sh) in order to properly utilize the cluster.  Right now, I'd be
>>>>>> happy just to have the process complete successfully with the 1 gig file,
>>>>>> so I'd really appreciate any suggestions you all might have.
>>>>>> >
>>>>>> > Here's a summary of the code I'm running through the spark shell on
>>>>>> the master:
>>>>>> >
>>>>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>>>>> >   (s.split("\\s+").sliding(n)).filter(_.length ==
>>>>>> n).map(_.mkString(" ")).map(_.trim).toList
>>>>>> > }
>>>>>> >
>>>>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>>>>> >
>>>>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>>>>> >
>>>>>> > So far so good; the problems come during the reduce phase.  With
>>>>>> small files, I was able to issue the following to calculate the most
>>>>>> frequently occurring trigram:
>>>>>> >
>>>>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>>>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>>>>> >
>>>>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>>>>> errors, so I decided to split the reduction to several steps, starting with
>>>>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to
>>>>>> complete successfully.
>>>>>> >
>>>>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>>>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>>>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>>>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>>>>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>>>>>  Also, programmatically, what should I be doing differently?  (For example,
>>>>>> should I be setting the minimum number of splits when reading the text
>>>>>> file?  If so, what would be a good default?).
>>>>>> >
>>>>>> > I apologize for what I'm sure are very naive questions.  I think
>>>>>> Spark is a fantastic project and have enjoyed working with it, but I'm
>>>>>> still very much a newbie and would appreciate any help you all can provide
>>>>>> (as well as any 'rules-of-thumb' or best practices I should be following).
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Tim Perrigo
>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Aaron Davidson <il...@gmail.com>.
On the other hand, I totally agree that memory usage in Spark is rather
opaque, and is one area where we could do a lot better at in terms of
communicating issues, through both docs and instrumentation. At least with
serialization and such, you can get meaningful exceptions (hopefully), but
OOMs are just blanket "something wasn't right somewhere." Debugging them
empirically would require deep diving into Spark's heap allocations, which
requires a lot more knowledge of Spark internals than should be required
for general usage.


On Tue, Oct 22, 2013 at 9:22 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> Yes, but that also illustrates the problem faced by anyone trying to write
> a "little white paper or guide lines" to make newbies' experience painless.
>  Distributed computing clusters are necessarily complex things, and
> problems can crop up in multiple locations, layers or subsystems.  It's
> just not feasible to quickly bring up to speed someone with no experience
> in distributed programming and cluster systems.  It takes a lot of
> knowledge, both broad and deep.  Very few people have the complete scope of
> knowledge and experience required, so creating, debugging and maintaining a
> cluster computing application almost always has to be a team effort.
>
> Support organizations and communities can replace some of the need for a
> knowledgeable and well-functioning team, but not all of it; and at some
> point you have to expect that debugging is going to take a considerable
> amount of painstaking, systematic effort -- including a close reading of
> the available docs.
>
> Several people are working on making more and better reference and
> training material available, and some of that will include trouble-shooting
> guidance, but that doesn't mean that there can ever be "one little paper"
> to solve newbies' (or more experienced developers') problems or provide
> adequate guidance.  There's just too much to cover and too many different
> kinds or levels of initial-user knowledge to make that completely feasible.
>
>
>
> On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng <sh...@1618labs.com> wrote:
>
>> Hey Mark, I didn't mean to say that the information isn't out there --
>> just that when something goes wrong with spark, the scope of what could be
>> wrong is so large - some bad setting with JVM, serializer, akka, badly
>> written scala code, algorithm wrong, check worker logs, check executor
>> stderrs, ....
>>
>> When I looked at this post this morning, my initial thought wasn't that
>> "countByValue" would be at fault. ...probably since I've only been using
>> Scala/Spark for a month or so.
>>
>> It was just a suggestion to help newbies come up to speed more quickly
>> and gain insights into how to debug issues.
>>
>>
>> On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>>
>>> There's no need to guess at that.  The docs tell you directly:
>>>
>>> def countByValue(): Map[T, Long]
>>>
>>> Return the count of each unique value in this RDD as a map of (value,
>>> count) pairs. The final combine step happens locally on the master,
>>> equivalent to running a single reduce task.
>>>
>>>
>>>
>>> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:
>>>
>>>> Hi Matei,
>>>>
>>>> I've seen several memory tuning queries on this mailing list, and also
>>>> heard the same kinds of queries at the spark meetup. In fact the last
>>>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>>>> tuning is still a mystery".
>>>>
>>>> I certainly had lots of issues in when I first started. From memory
>>>> issues to gc issues, things seem to run fine until you try something with
>>>> 500GB of data etc.
>>>>
>>>> I was wondering if you could write up a little white paper or some
>>>> guide lines on how to set memory values, and what to look at when something
>>>> goes wrong? Eg. I would never gave guessed that countByValue happens on a
>>>> single machine etc.
>>>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> The problem is that countByValue happens in only a single reduce task
>>>>> -- this is probably something we should fix but it's basically not designed
>>>>> for lots of values. Instead, do the count in parallel as follows:
>>>>>
>>>>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>>>>
>>>>> If this still has trouble, you can also increase the level of
>>>>> parallelism of reduceByKey by passing it a second parameter for the number
>>>>> of tasks (e.g. 100).
>>>>>
>>>>> BTW one other small thing with your code, flatMap should actually work
>>>>> fine if your function returns an Iterator to Traversable, so there's no
>>>>> need to call toList and return a Seq in ngrams; you can just return an
>>>>> Iterator[String].
>>>>>
>>>>> Matei
>>>>>
>>>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi everyone,
>>>>> > I am very new to Spark, so as a learning exercise I've set up a
>>>>> small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves),
>>>>> which I'm hoping to use to calculate ngram frequencies from text files of
>>>>> various sizes (I'm not doing anything with them; I just thought this would
>>>>> be slightly more interesting than the usual 'word count' example).
>>>>>  Currently, I'm trying to work with a 1GB text file, but running into
>>>>> memory issues.  I'm wondering what parameters I should be setting (in
>>>>> spark-env.sh) in order to properly utilize the cluster.  Right now, I'd be
>>>>> happy just to have the process complete successfully with the 1 gig file,
>>>>> so I'd really appreciate any suggestions you all might have.
>>>>> >
>>>>> > Here's a summary of the code I'm running through the spark shell on
>>>>> the master:
>>>>> >
>>>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>>>> >   (s.split("\\s+").sliding(n)).filter(_.length ==
>>>>> n).map(_.mkString(" ")).map(_.trim).toList
>>>>> > }
>>>>> >
>>>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>>>> >
>>>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>>>> >
>>>>> > So far so good; the problems come during the reduce phase.  With
>>>>> small files, I was able to issue the following to calculate the most
>>>>> frequently occurring trigram:
>>>>> >
>>>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>>>> >
>>>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>>>> errors, so I decided to split the reduction to several steps, starting with
>>>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to
>>>>> complete successfully.
>>>>> >
>>>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>>>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>>>>  Also, programmatically, what should I be doing differently?  (For example,
>>>>> should I be setting the minimum number of splits when reading the text
>>>>> file?  If so, what would be a good default?).
>>>>> >
>>>>> > I apologize for what I'm sure are very naive questions.  I think
>>>>> Spark is a fantastic project and have enjoyed working with it, but I'm
>>>>> still very much a newbie and would appreciate any help you all can provide
>>>>> (as well as any 'rules-of-thumb' or best practices I should be following).
>>>>> >
>>>>> > Thanks,
>>>>> > Tim Perrigo
>>>>>
>>>>>
>>>
>>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Mark Hamstra <ma...@clearstorydata.com>.
Yes, but that also illustrates the problem faced by anyone trying to write
a "little white paper or guide lines" to make newbies' experience painless.
 Distributed computing clusters are necessarily complex things, and
problems can crop up in multiple locations, layers or subsystems.  It's
just not feasible to quickly bring up to speed someone with no experience
in distributed programming and cluster systems.  It takes a lot of
knowledge, both broad and deep.  Very few people have the complete scope of
knowledge and experience required, so creating, debugging and maintaining a
cluster computing application almost always has to be a team effort.

Support organizations and communities can replace some of the need for a
knowledgeable and well-functioning team, but not all of it; and at some
point you have to expect that debugging is going to take a considerable
amount of painstaking, systematic effort -- including a close reading of
the available docs.

Several people are working on making more and better reference and training
material available, and some of that will include trouble-shooting
guidance, but that doesn't mean that there can ever be "one little paper"
to solve newbies' (or more experienced developers') problems or provide
adequate guidance.  There's just too much to cover and too many different
kinds or levels of initial-user knowledge to make that completely feasible.



On Tue, Oct 22, 2013 at 8:50 AM, Shay Seng <sh...@1618labs.com> wrote:

> Hey Mark, I didn't mean to say that the information isn't out there --
> just that when something goes wrong with spark, the scope of what could be
> wrong is so large - some bad setting with JVM, serializer, akka, badly
> written scala code, algorithm wrong, check worker logs, check executor
> stderrs, ....
>
> When I looked at this post this morning, my initial thought wasn't that
> "countByValue" would be at fault. ...probably since I've only been using
> Scala/Spark for a month or so.
>
> It was just a suggestion to help newbies come up to speed more quickly and
> gain insights into how to debug issues.
>
>
> On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:
>
>> There's no need to guess at that.  The docs tell you directly:
>>
>> def countByValue(): Map[T, Long]
>>
>> Return the count of each unique value in this RDD as a map of (value,
>> count) pairs. The final combine step happens locally on the master,
>> equivalent to running a single reduce task.
>>
>>
>>
>> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:
>>
>>> Hi Matei,
>>>
>>> I've seen several memory tuning queries on this mailing list, and also
>>> heard the same kinds of queries at the spark meetup. In fact the last
>>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>>> tuning is still a mystery".
>>>
>>> I certainly had lots of issues in when I first started. From memory
>>> issues to gc issues, things seem to run fine until you try something with
>>> 500GB of data etc.
>>>
>>> I was wondering if you could write up a little white paper or some guide
>>> lines on how to set memory values, and what to look at when something goes
>>> wrong? Eg. I would never gave guessed that countByValue happens on a single
>>> machine etc.
>>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> The problem is that countByValue happens in only a single reduce task
>>>> -- this is probably something we should fix but it's basically not designed
>>>> for lots of values. Instead, do the count in parallel as follows:
>>>>
>>>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>>>
>>>> If this still has trouble, you can also increase the level of
>>>> parallelism of reduceByKey by passing it a second parameter for the number
>>>> of tasks (e.g. 100).
>>>>
>>>> BTW one other small thing with your code, flatMap should actually work
>>>> fine if your function returns an Iterator to Traversable, so there's no
>>>> need to call toList and return a Seq in ngrams; you can just return an
>>>> Iterator[String].
>>>>
>>>> Matei
>>>>
>>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi everyone,
>>>> > I am very new to Spark, so as a learning exercise I've set up a small
>>>> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
>>>> I'm hoping to use to calculate ngram frequencies from text files of various
>>>> sizes (I'm not doing anything with them; I just thought this would be
>>>> slightly more interesting than the usual 'word count' example).  Currently,
>>>> I'm trying to work with a 1GB text file, but running into memory issues.
>>>>  I'm wondering what parameters I should be setting (in spark-env.sh) in
>>>> order to properly utilize the cluster.  Right now, I'd be happy just to
>>>> have the process complete successfully with the 1 gig file, so I'd really
>>>> appreciate any suggestions you all might have.
>>>> >
>>>> > Here's a summary of the code I'm running through the spark shell on
>>>> the master:
>>>> >
>>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>>> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
>>>> ")).map(_.trim).toList
>>>> > }
>>>> >
>>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>>> >
>>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>>> >
>>>> > So far so good; the problems come during the reduce phase.  With
>>>> small files, I was able to issue the following to calculate the most
>>>> frequently occurring trigram:
>>>> >
>>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>>> >
>>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>>> errors, so I decided to split the reduction to several steps, starting with
>>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to
>>>> complete successfully.
>>>> >
>>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>>>  Also, programmatically, what should I be doing differently?  (For example,
>>>> should I be setting the minimum number of splits when reading the text
>>>> file?  If so, what would be a good default?).
>>>> >
>>>> > I apologize for what I'm sure are very naive questions.  I think
>>>> Spark is a fantastic project and have enjoyed working with it, but I'm
>>>> still very much a newbie and would appreciate any help you all can provide
>>>> (as well as any 'rules-of-thumb' or best practices I should be following).
>>>> >
>>>> > Thanks,
>>>> > Tim Perrigo
>>>>
>>>>
>>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Shay Seng <sh...@1618labs.com>.
Hey Mark, I didn't mean to say that the information isn't out there -- just
that when something goes wrong with spark, the scope of what could be wrong
is so large - some bad setting with JVM, serializer, akka, badly written
scala code, algorithm wrong, check worker logs, check executor stderrs,
....

When I looked at this post this morning, my initial thought wasn't that
"countByValue" would be at fault. ...probably since I've only been using
Scala/Spark for a month or so.

It was just a suggestion to help newbies come up to speed more quickly and
gain insights into how to debug issues.


On Tue, Oct 22, 2013 at 8:14 AM, Mark Hamstra <ma...@clearstorydata.com>wrote:

> There's no need to guess at that.  The docs tell you directly:
>
> def countByValue(): Map[T, Long]
>
> Return the count of each unique value in this RDD as a map of (value,
> count) pairs. The final combine step happens locally on the master,
> equivalent to running a single reduce task.
>
>
>
> On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:
>
>> Hi Matei,
>>
>> I've seen several memory tuning queries on this mailing list, and also
>> heard the same kinds of queries at the spark meetup. In fact the last
>> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
>> tuning is still a mystery".
>>
>> I certainly had lots of issues in when I first started. From memory
>> issues to gc issues, things seem to run fine until you try something with
>> 500GB of data etc.
>>
>> I was wondering if you could write up a little white paper or some guide
>> lines on how to set memory values, and what to look at when something goes
>> wrong? Eg. I would never gave guessed that countByValue happens on a single
>> machine etc.
>>  On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> The problem is that countByValue happens in only a single reduce task --
>>> this is probably something we should fix but it's basically not designed
>>> for lots of values. Instead, do the count in parallel as follows:
>>>
>>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>>
>>> If this still has trouble, you can also increase the level of
>>> parallelism of reduceByKey by passing it a second parameter for the number
>>> of tasks (e.g. 100).
>>>
>>> BTW one other small thing with your code, flatMap should actually work
>>> fine if your function returns an Iterator to Traversable, so there's no
>>> need to call toList and return a Seq in ngrams; you can just return an
>>> Iterator[String].
>>>
>>> Matei
>>>
>>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com> wrote:
>>>
>>> > Hi everyone,
>>> > I am very new to Spark, so as a learning exercise I've set up a small
>>> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
>>> I'm hoping to use to calculate ngram frequencies from text files of various
>>> sizes (I'm not doing anything with them; I just thought this would be
>>> slightly more interesting than the usual 'word count' example).  Currently,
>>> I'm trying to work with a 1GB text file, but running into memory issues.
>>>  I'm wondering what parameters I should be setting (in spark-env.sh) in
>>> order to properly utilize the cluster.  Right now, I'd be happy just to
>>> have the process complete successfully with the 1 gig file, so I'd really
>>> appreciate any suggestions you all might have.
>>> >
>>> > Here's a summary of the code I'm running through the spark shell on
>>> the master:
>>> >
>>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>>> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
>>> ")).map(_.trim).toList
>>> > }
>>> >
>>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>>> >
>>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>>> >
>>> > So far so good; the problems come during the reduce phase.  With small
>>> files, I was able to issue the following to calculate the most frequently
>>> occurring trigram:
>>> >
>>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>>> b:(String, Long)) => if (a._2 > b._2) a else b)
>>> >
>>> > With the 1 gig file, though, I've been running into OutOfMemory
>>> errors, so I decided to split the reduction to several steps, starting with
>>> simply issuing countByValue of my "mapped" RDD, but I have yet to get it to
>>> complete successfully.
>>> >
>>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>>> at straws; I'm not sure what a "proper" value would be).  What properties
>>> should I be setting for a job of this size on a cluster of 3 m1.large
>>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>>  Also, programmatically, what should I be doing differently?  (For example,
>>> should I be setting the minimum number of splits when reading the text
>>> file?  If so, what would be a good default?).
>>> >
>>> > I apologize for what I'm sure are very naive questions.  I think Spark
>>> is a fantastic project and have enjoyed working with it, but I'm still very
>>> much a newbie and would appreciate any help you all can provide (as well as
>>> any 'rules-of-thumb' or best practices I should be following).
>>> >
>>> > Thanks,
>>> > Tim Perrigo
>>>
>>>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Mark Hamstra <ma...@clearstorydata.com>.
There's no need to guess at that.  The docs tell you directly:

def countByValue(): Map[T, Long]

Return the count of each unique value in this RDD as a map of (value,
count) pairs. The final combine step happens locally on the master,
equivalent to running a single reduce task.



On Tue, Oct 22, 2013 at 7:22 AM, Shay Seng <sh...@1618labs.com> wrote:

> Hi Matei,
>
> I've seen several memory tuning queries on this mailing list, and also
> heard the same kinds of queries at the spark meetup. In fact the last
> bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
> tuning is still a mystery".
>
> I certainly had lots of issues in when I first started. From memory issues
> to gc issues, things seem to run fine until you try something with 500GB of
> data etc.
>
> I was wondering if you could write up a little white paper or some guide
> lines on how to set memory values, and what to look at when something goes
> wrong? Eg. I would never gave guessed that countByValue happens on a single
> machine etc.
> On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com> wrote:
>
>> Hi there,
>>
>> The problem is that countByValue happens in only a single reduce task --
>> this is probably something we should fix but it's basically not designed
>> for lots of values. Instead, do the count in parallel as follows:
>>
>> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>>
>> If this still has trouble, you can also increase the level of parallelism
>> of reduceByKey by passing it a second parameter for the number of tasks
>> (e.g. 100).
>>
>> BTW one other small thing with your code, flatMap should actually work
>> fine if your function returns an Iterator to Traversable, so there's no
>> need to call toList and return a Seq in ngrams; you can just return an
>> Iterator[String].
>>
>> Matei
>>
>> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com> wrote:
>>
>> > Hi everyone,
>> > I am very new to Spark, so as a learning exercise I've set up a small
>> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
>> I'm hoping to use to calculate ngram frequencies from text files of various
>> sizes (I'm not doing anything with them; I just thought this would be
>> slightly more interesting than the usual 'word count' example).  Currently,
>> I'm trying to work with a 1GB text file, but running into memory issues.
>>  I'm wondering what parameters I should be setting (in spark-env.sh) in
>> order to properly utilize the cluster.  Right now, I'd be happy just to
>> have the process complete successfully with the 1 gig file, so I'd really
>> appreciate any suggestions you all might have.
>> >
>> > Here's a summary of the code I'm running through the spark shell on the
>> master:
>> >
>> > def ngrams(s: String, n: Int = 3): Seq[String] = {
>> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
>> ")).map(_.trim).toList
>> > }
>> >
>> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
>> >
>> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
>> >
>> > So far so good; the problems come during the reduce phase.  With small
>> files, I was able to issue the following to calculate the most frequently
>> occurring trigram:
>> >
>> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
>> b:(String, Long)) => if (a._2 > b._2) a else b)
>> >
>> > With the 1 gig file, though, I've been running into OutOfMemory errors,
>> so I decided to split the reduction to several steps, starting with simply
>> issuing countByValue of my "mapped" RDD, but I have yet to get it to
>> complete successfully.
>> >
>> > SPARK_MEM is currently set to 6154m.  I also bumped up the
>> spark.akka.framesize setting to 500 (though at this point, I was grasping
>> at straws; I'm not sure what a "proper" value would be).  What properties
>> should I be setting for a job of this size on a cluster of 3 m1.large
>> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>>  Also, programmatically, what should I be doing differently?  (For example,
>> should I be setting the minimum number of splits when reading the text
>> file?  If so, what would be a good default?).
>> >
>> > I apologize for what I'm sure are very naive questions.  I think Spark
>> is a fantastic project and have enjoyed working with it, but I'm still very
>> much a newbie and would appreciate any help you all can provide (as well as
>> any 'rules-of-thumb' or best practices I should be following).
>> >
>> > Thanks,
>> > Tim Perrigo
>>
>>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Shay Seng <sh...@1618labs.com>.
Hi Matei,

I've seen several memory tuning queries on this mailing list, and also
heard the same kinds of queries at the spark meetup. In fact the last
bullet point in Josh Carver(?) slides, the guy from Bizo, was "memory
tuning is still a mystery".

I certainly had lots of issues in when I first started. From memory issues
to gc issues, things seem to run fine until you try something with 500GB of
data etc.

I was wondering if you could write up a little white paper or some guide
lines on how to set memory values, and what to look at when something goes
wrong? Eg. I would never gave guessed that countByValue happens on a single
machine etc.
On Oct 21, 2013 6:18 PM, "Matei Zaharia" <ma...@gmail.com> wrote:

> Hi there,
>
> The problem is that countByValue happens in only a single reduce task --
> this is probably something we should fix but it's basically not designed
> for lots of values. Instead, do the count in parallel as follows:
>
> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>
> If this still has trouble, you can also increase the level of parallelism
> of reduceByKey by passing it a second parameter for the number of tasks
> (e.g. 100).
>
> BTW one other small thing with your code, flatMap should actually work
> fine if your function returns an Iterator to Traversable, so there's no
> need to call toList and return a Seq in ngrams; you can just return an
> Iterator[String].
>
> Matei
>
> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com> wrote:
>
> > Hi everyone,
> > I am very new to Spark, so as a learning exercise I've set up a small
> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
> I'm hoping to use to calculate ngram frequencies from text files of various
> sizes (I'm not doing anything with them; I just thought this would be
> slightly more interesting than the usual 'word count' example).  Currently,
> I'm trying to work with a 1GB text file, but running into memory issues.
>  I'm wondering what parameters I should be setting (in spark-env.sh) in
> order to properly utilize the cluster.  Right now, I'd be happy just to
> have the process complete successfully with the 1 gig file, so I'd really
> appreciate any suggestions you all might have.
> >
> > Here's a summary of the code I'm running through the spark shell on the
> master:
> >
> > def ngrams(s: String, n: Int = 3): Seq[String] = {
> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
> ")).map(_.trim).toList
> > }
> >
> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
> >
> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
> >
> > So far so good; the problems come during the reduce phase.  With small
> files, I was able to issue the following to calculate the most frequently
> occurring trigram:
> >
> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
> b:(String, Long)) => if (a._2 > b._2) a else b)
> >
> > With the 1 gig file, though, I've been running into OutOfMemory errors,
> so I decided to split the reduction to several steps, starting with simply
> issuing countByValue of my "mapped" RDD, but I have yet to get it to
> complete successfully.
> >
> > SPARK_MEM is currently set to 6154m.  I also bumped up the
> spark.akka.framesize setting to 500 (though at this point, I was grasping
> at straws; I'm not sure what a "proper" value would be).  What properties
> should I be setting for a job of this size on a cluster of 3 m1.large
> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>  Also, programmatically, what should I be doing differently?  (For example,
> should I be setting the minimum number of splits when reading the text
> file?  If so, what would be a good default?).
> >
> > I apologize for what I'm sure are very naive questions.  I think Spark
> is a fantastic project and have enjoyed working with it, but I'm still very
> much a newbie and would appreciate any help you all can provide (as well as
> any 'rules-of-thumb' or best practices I should be following).
> >
> > Thanks,
> > Tim Perrigo
>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Timothy Perrigo <tp...@gmail.com>.
I didn't realize that about countByValue...thanks!  And thanks for the code
advice as well; I appreciate all the help I can get!

I've switched from countByValue to the reduceByKey method you sent; with
that as the only change, I did see much better utilization of the nodes in
the cluster, but eventually still got OutOfMemory errors.  I'm trying now
setting the number of tasks as the second parameter to reduceByKey, using
100 (as you suggested).  My question at this point is-- is there any
rule-of-thumb for determining when you need to specify the number of tasks,
or for determining how many tasks to use?  Similarly, I see the
SparkContext.textFile method allows the setting of 'default min splits';
when would you want / need to set this parameter, and how would you
determine what to set it to?

Sorry to overwhelm you with questions!  I've been really enjoying working
with Spark, but I haven't been able to figure out how to answer these kinds
of questions.  Your help would be very much appreciated!

Tim


On Mon, Oct 21, 2013 at 8:18 PM, Matei Zaharia <ma...@gmail.com>wrote:

> Hi there,
>
> The problem is that countByValue happens in only a single reduce task --
> this is probably something we should fix but it's basically not designed
> for lots of values. Instead, do the count in parallel as follows:
>
> val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)
>
> If this still has trouble, you can also increase the level of parallelism
> of reduceByKey by passing it a second parameter for the number of tasks
> (e.g. 100).
>
> BTW one other small thing with your code, flatMap should actually work
> fine if your function returns an Iterator to Traversable, so there's no
> need to call toList and return a Seq in ngrams; you can just return an
> Iterator[String].
>
> Matei
>
> On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com> wrote:
>
> > Hi everyone,
> > I am very new to Spark, so as a learning exercise I've set up a small
> cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which
> I'm hoping to use to calculate ngram frequencies from text files of various
> sizes (I'm not doing anything with them; I just thought this would be
> slightly more interesting than the usual 'word count' example).  Currently,
> I'm trying to work with a 1GB text file, but running into memory issues.
>  I'm wondering what parameters I should be setting (in spark-env.sh) in
> order to properly utilize the cluster.  Right now, I'd be happy just to
> have the process complete successfully with the 1 gig file, so I'd really
> appreciate any suggestions you all might have.
> >
> > Here's a summary of the code I'm running through the spark shell on the
> master:
> >
> > def ngrams(s: String, n: Int = 3): Seq[String] = {
> >   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString("
> ")).map(_.trim).toList
> > }
> >
> > val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
> >
> > val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
> >
> > So far so good; the problems come during the reduce phase.  With small
> files, I was able to issue the following to calculate the most frequently
> occurring trigram:
> >
> > val topNgram = (mapped countByValue) reduce((a:(String, Long),
> b:(String, Long)) => if (a._2 > b._2) a else b)
> >
> > With the 1 gig file, though, I've been running into OutOfMemory errors,
> so I decided to split the reduction to several steps, starting with simply
> issuing countByValue of my "mapped" RDD, but I have yet to get it to
> complete successfully.
> >
> > SPARK_MEM is currently set to 6154m.  I also bumped up the
> spark.akka.framesize setting to 500 (though at this point, I was grasping
> at straws; I'm not sure what a "proper" value would be).  What properties
> should I be setting for a job of this size on a cluster of 3 m1.large
> slaves? (The cluster was initially configured using the spark-ec2 scripts).
>  Also, programmatically, what should I be doing differently?  (For example,
> should I be setting the minimum number of splits when reading the text
> file?  If so, what would be a good default?).
> >
> > I apologize for what I'm sure are very naive questions.  I think Spark
> is a fantastic project and have enjoyed working with it, but I'm still very
> much a newbie and would appreciate any help you all can provide (as well as
> any 'rules-of-thumb' or best practices I should be following).
> >
> > Thanks,
> > Tim Perrigo
>
>

Re: Help with Initial Cluster Configuration / Tuning

Posted by Matei Zaharia <ma...@gmail.com>.
Hi there,

The problem is that countByValue happens in only a single reduce task -- this is probably something we should fix but it's basically not designed for lots of values. Instead, do the count in parallel as follows:

val counts = mapped.map(str => (str, 1)).reduceByKey((a, b) => a + b)

If this still has trouble, you can also increase the level of parallelism of reduceByKey by passing it a second parameter for the number of tasks (e.g. 100).

BTW one other small thing with your code, flatMap should actually work fine if your function returns an Iterator to Traversable, so there's no need to call toList and return a Seq in ngrams; you can just return an Iterator[String].

Matei

On Oct 21, 2013, at 1:05 PM, Timothy Perrigo <tp...@gmail.com> wrote:

> Hi everyone,
> I am very new to Spark, so as a learning exercise I've set up a small cluster consisting of 4 EC2 m1.large instances (1 master, 3 slaves), which I'm hoping to use to calculate ngram frequencies from text files of various sizes (I'm not doing anything with them; I just thought this would be slightly more interesting than the usual 'word count' example).  Currently, I'm trying to work with a 1GB text file, but running into memory issues.  I'm wondering what parameters I should be setting (in spark-env.sh) in order to properly utilize the cluster.  Right now, I'd be happy just to have the process complete successfully with the 1 gig file, so I'd really appreciate any suggestions you all might have.
> 
> Here's a summary of the code I'm running through the spark shell on the master:
> 
> def ngrams(s: String, n: Int = 3): Seq[String] = {
>   (s.split("\\s+").sliding(n)).filter(_.length == n).map(_.mkString(" ")).map(_.trim).toList
> }
> 
> val text = sc.textFile("s3n://my-bucket/my-1gb-text-file")
> 
> val mapped = text.filter(_.trim.length > 0).flatMap(ngrams(_, 3))
> 
> So far so good; the problems come during the reduce phase.  With small files, I was able to issue the following to calculate the most frequently occurring trigram:
> 
> val topNgram = (mapped countByValue) reduce((a:(String, Long), b:(String, Long)) => if (a._2 > b._2) a else b)
> 
> With the 1 gig file, though, I've been running into OutOfMemory errors, so I decided to split the reduction to several steps, starting with simply issuing countByValue of my "mapped" RDD, but I have yet to get it to complete successfully.
> 
> SPARK_MEM is currently set to 6154m.  I also bumped up the spark.akka.framesize setting to 500 (though at this point, I was grasping at straws; I'm not sure what a "proper" value would be).  What properties should I be setting for a job of this size on a cluster of 3 m1.large slaves? (The cluster was initially configured using the spark-ec2 scripts).  Also, programmatically, what should I be doing differently?  (For example, should I be setting the minimum number of splits when reading the text file?  If so, what would be a good default?).
> 
> I apologize for what I'm sure are very naive questions.  I think Spark is a fantastic project and have enjoyed working with it, but I'm still very much a newbie and would appreciate any help you all can provide (as well as any 'rules-of-thumb' or best practices I should be following).
> 
> Thanks,
> Tim Perrigo