You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Everett Anderson <ev...@nuna.com> on 2015/08/10 22:49:34 UTC

Crunch performance & cluster configuration

Hi,

We've written a large processing pipeline in Crunch, which has been great
because it's testable and the code is rather clear.

When using the MapReduce runner, we end up with around 350 executed MR
applications for one month of input data. We're doing a lot of joins, so we
expect many applications.

I'm trying to figure out our strategy and cluster configurations for
scaling to more data on AWS EMR.

We've set our bytes per reduce target low enough that we usually have more
Map and Reduce tasks than machines, but not by much, and no given shard or
application seems to be a long pole.

I've noticed that

1) Most individual Map or Reduce jobs are short-lived, commonly 1-2 minutes
with our one month input data set.

2) Adding EMR Task instances (which don't participate in HDFS so must
send/receive everything over the network) does not help us scale -- their
CPU utilization is terrible.

3) Adding Core instances does seem to help reduce runtime, though their CPU
utilization starts going down.

This makes me suspect that our main bottleneck will be in either disk or
network I/O in shuffles.

Does anyone have pointers for evaluating or tweaking performance in a
many-MR application Crunch pipeline like this? Given Crunch makes it so
easy to write these, I suspect others would hit the same issues.

Would switching from MapReduce to Spark likely be a big win? My uninformed
impression is that Spark might require fewer disk operations, though I
don't see how it could avoid more cross-machine shuffles given our joins.

Thanks,
Everett

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Crunch performance & cluster configuration

Posted by Josh Wills <jw...@cloudera.com>.
On Wed, Aug 12, 2015 at 1:40 PM, Everett Anderson <ev...@nuna.com> wrote:

> Thanks, Josh!
>
> Couple of additional questions --
>
> 1) When you say you don't write many many complex pipelines, what's your
> definition of complex? Is it more than N joins or cogroups?
>

I'm not sure, to be honest. You could certainly look at some of the Crunch
jobs I've written for stuff like Oryx or inside of Exhibit ETL and say that
those are pretty complex. But I think of what Cerner and you guys do as
"real" complex pipelines-- lots of different data sources that need to be
integrated to answer a lot of different questions.


> 2) You mentioned managing calls to o.a.h.conf.Configuration.get(). I
> don't think we're actually manipulating the Configuration from within our
> DoFns functions today. Are there places in Crunch that do it under the
> covers?
>

No, there shouldn't be. But that doesn't mean you shouldn't use Xprof as
much as you can to understand what the equivalent of
o.a.h.conf.Configuration.get() is for your pipelines.


>
> 3) At the moment, I believe we're using the Writable type family with
> Thrifts and when we rekey tables to perform group by key operations /
> joins, we're using plain String keys. Do you think we'd still get benefit
> from switching to the Avro type family and using RawComparator even in this
> case?
>

That really should be fine, but again, if your profiler is spending a ton
of time inside of WritableComparator or Text, it's probably a sign that
there's an opportunity to deploy a more efficient custom implementation of
the grouping comparator, which you can always override in Crunch via the
GroupingOptions class that can be passed to groupByKey operations.


>
>
>
>
>
>
> On Mon, Aug 10, 2015 at 7:06 PM, Josh Wills <jw...@cloudera.com> wrote:
>
>> Hey Everett,
>>
>> My two cents (keeping in mind that I write very few incredibly complex
>> pipelines) is that the best way to improve MR performance is to generate as
>> many answers as you can per shuffle operation-- i.e., from a performance
>> perspective, you're better off cogrouping N data sources that need to be
>> joined on the same key and processing all of their child joins as part of a
>> single MR job than you are doing pairwise joins between the individual
>> sources and running (N choose 2) MR jobs independently of one another.
>> Whenever I'm confronted w/a bunch of outputs I need to generate, all of my
>> design/data modeling thought goes into creating them in as few jobs as
>> possible.
>>
>> With that aside, it's probably the case that if spending all of your time
>> serializing/deserializing data or doing IO, a bit of profiling and managing
>> things like calls to o.a.h.conf.Configuration.get(), the type of
>> intermediate data serialization you're doing, and making sure that you're
>> using RawComparator implementation everywhere you can. One of the reasons
>> we try to push Avro everywhere we can in Crunch is the great out-of-the-box
>> support for doing shuffles w/o having to deserialize keys. Twitter gave a
>> great preso recently about their experiences on top of Scalding/Cascading
>> that should also be helpful for Crunch:
>>
>>
>> http://www.slideshare.net/Hadoop_Summit/hadoop-performance-optimization-at-scale-lessons-learned-at-twitter
>>
>> Best,
>> Josh
>>
>>
>> On Mon, Aug 10, 2015 at 1:49 PM, Everett Anderson <ev...@nuna.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We've written a large processing pipeline in Crunch, which has been
>>> great because it's testable and the code is rather clear.
>>>
>>> When using the MapReduce runner, we end up with around 350 executed MR
>>> applications for one month of input data. We're doing a lot of joins, so we
>>> expect many applications.
>>>
>>> I'm trying to figure out our strategy and cluster configurations for
>>> scaling to more data on AWS EMR.
>>>
>>> We've set our bytes per reduce target low enough that we usually have
>>> more Map and Reduce tasks than machines, but not by much, and no given
>>> shard or application seems to be a long pole.
>>>
>>> I've noticed that
>>>
>>> 1) Most individual Map or Reduce jobs are short-lived, commonly 1-2
>>> minutes with our one month input data set.
>>>
>>> 2) Adding EMR Task instances (which don't participate in HDFS so must
>>> send/receive everything over the network) does not help us scale -- their
>>> CPU utilization is terrible.
>>>
>>> 3) Adding Core instances does seem to help reduce runtime, though their
>>> CPU utilization starts going down.
>>>
>>> This makes me suspect that our main bottleneck will be in either disk or
>>> network I/O in shuffles.
>>>
>>> Does anyone have pointers for evaluating or tweaking performance in a
>>> many-MR application Crunch pipeline like this? Given Crunch makes it so
>>> easy to write these, I suspect others would hit the same issues.
>>>
>>> Would switching from MapReduce to Spark likely be a big win? My
>>> uninformed impression is that Spark might require fewer disk operations,
>>> though I don't see how it could avoid more cross-machine shuffles given our
>>> joins.
>>>
>>> Thanks,
>>> Everett
>>>
>>>
>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>> may contain information that is confidential, proprietary in nature,
>>> protected health information (PHI), or otherwise protected by law from
>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>> are not the intended recipient, you are hereby notified that any use,
>>> disclosure or copying of this email, including any attachments, is
>>> unauthorized and strictly prohibited. If you have received this email in
>>> error, please notify the sender of this email. Please delete this and all
>>> copies of this email from your system. Any opinions either expressed or
>>> implied in this email and all attachments, are those of its author only,
>>> and do not necessarily reflect those of Nuna Health, Inc.
>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Crunch performance & cluster configuration

Posted by Everett Anderson <ev...@nuna.com>.
Thanks, Josh!

Couple of additional questions --

1) When you say you don't write many many complex pipelines, what's your
definition of complex? Is it more than N joins or cogroups?

2) You mentioned managing calls to o.a.h.conf.Configuration.get(). I don't
think we're actually manipulating the Configuration from within our DoFns
functions today. Are there places in Crunch that do it under the covers?

3) At the moment, I believe we're using the Writable type family with
Thrifts and when we rekey tables to perform group by key operations /
joins, we're using plain String keys. Do you think we'd still get benefit
from switching to the Avro type family and using RawComparator even in this
case?






On Mon, Aug 10, 2015 at 7:06 PM, Josh Wills <jw...@cloudera.com> wrote:

> Hey Everett,
>
> My two cents (keeping in mind that I write very few incredibly complex
> pipelines) is that the best way to improve MR performance is to generate as
> many answers as you can per shuffle operation-- i.e., from a performance
> perspective, you're better off cogrouping N data sources that need to be
> joined on the same key and processing all of their child joins as part of a
> single MR job than you are doing pairwise joins between the individual
> sources and running (N choose 2) MR jobs independently of one another.
> Whenever I'm confronted w/a bunch of outputs I need to generate, all of my
> design/data modeling thought goes into creating them in as few jobs as
> possible.
>
> With that aside, it's probably the case that if spending all of your time
> serializing/deserializing data or doing IO, a bit of profiling and managing
> things like calls to o.a.h.conf.Configuration.get(), the type of
> intermediate data serialization you're doing, and making sure that you're
> using RawComparator implementation everywhere you can. One of the reasons
> we try to push Avro everywhere we can in Crunch is the great out-of-the-box
> support for doing shuffles w/o having to deserialize keys. Twitter gave a
> great preso recently about their experiences on top of Scalding/Cascading
> that should also be helpful for Crunch:
>
>
> http://www.slideshare.net/Hadoop_Summit/hadoop-performance-optimization-at-scale-lessons-learned-at-twitter
>
> Best,
> Josh
>
>
> On Mon, Aug 10, 2015 at 1:49 PM, Everett Anderson <ev...@nuna.com>
> wrote:
>
>> Hi,
>>
>> We've written a large processing pipeline in Crunch, which has been great
>> because it's testable and the code is rather clear.
>>
>> When using the MapReduce runner, we end up with around 350 executed MR
>> applications for one month of input data. We're doing a lot of joins, so we
>> expect many applications.
>>
>> I'm trying to figure out our strategy and cluster configurations for
>> scaling to more data on AWS EMR.
>>
>> We've set our bytes per reduce target low enough that we usually have
>> more Map and Reduce tasks than machines, but not by much, and no given
>> shard or application seems to be a long pole.
>>
>> I've noticed that
>>
>> 1) Most individual Map or Reduce jobs are short-lived, commonly 1-2
>> minutes with our one month input data set.
>>
>> 2) Adding EMR Task instances (which don't participate in HDFS so must
>> send/receive everything over the network) does not help us scale -- their
>> CPU utilization is terrible.
>>
>> 3) Adding Core instances does seem to help reduce runtime, though their
>> CPU utilization starts going down.
>>
>> This makes me suspect that our main bottleneck will be in either disk or
>> network I/O in shuffles.
>>
>> Does anyone have pointers for evaluating or tweaking performance in a
>> many-MR application Crunch pipeline like this? Given Crunch makes it so
>> easy to write these, I suspect others would hit the same issues.
>>
>> Would switching from MapReduce to Spark likely be a big win? My
>> uninformed impression is that Spark might require fewer disk operations,
>> though I don't see how it could avoid more cross-machine shuffles given our
>> joins.
>>
>> Thanks,
>> Everett
>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Crunch performance & cluster configuration

Posted by Josh Wills <jw...@cloudera.com>.
Hey Everett,

My two cents (keeping in mind that I write very few incredibly complex
pipelines) is that the best way to improve MR performance is to generate as
many answers as you can per shuffle operation-- i.e., from a performance
perspective, you're better off cogrouping N data sources that need to be
joined on the same key and processing all of their child joins as part of a
single MR job than you are doing pairwise joins between the individual
sources and running (N choose 2) MR jobs independently of one another.
Whenever I'm confronted w/a bunch of outputs I need to generate, all of my
design/data modeling thought goes into creating them in as few jobs as
possible.

With that aside, it's probably the case that if spending all of your time
serializing/deserializing data or doing IO, a bit of profiling and managing
things like calls to o.a.h.conf.Configuration.get(), the type of
intermediate data serialization you're doing, and making sure that you're
using RawComparator implementation everywhere you can. One of the reasons
we try to push Avro everywhere we can in Crunch is the great out-of-the-box
support for doing shuffles w/o having to deserialize keys. Twitter gave a
great preso recently about their experiences on top of Scalding/Cascading
that should also be helpful for Crunch:

http://www.slideshare.net/Hadoop_Summit/hadoop-performance-optimization-at-scale-lessons-learned-at-twitter

Best,
Josh


On Mon, Aug 10, 2015 at 1:49 PM, Everett Anderson <ev...@nuna.com> wrote:

> Hi,
>
> We've written a large processing pipeline in Crunch, which has been great
> because it's testable and the code is rather clear.
>
> When using the MapReduce runner, we end up with around 350 executed MR
> applications for one month of input data. We're doing a lot of joins, so we
> expect many applications.
>
> I'm trying to figure out our strategy and cluster configurations for
> scaling to more data on AWS EMR.
>
> We've set our bytes per reduce target low enough that we usually have more
> Map and Reduce tasks than machines, but not by much, and no given shard or
> application seems to be a long pole.
>
> I've noticed that
>
> 1) Most individual Map or Reduce jobs are short-lived, commonly 1-2
> minutes with our one month input data set.
>
> 2) Adding EMR Task instances (which don't participate in HDFS so must
> send/receive everything over the network) does not help us scale -- their
> CPU utilization is terrible.
>
> 3) Adding Core instances does seem to help reduce runtime, though their
> CPU utilization starts going down.
>
> This makes me suspect that our main bottleneck will be in either disk or
> network I/O in shuffles.
>
> Does anyone have pointers for evaluating or tweaking performance in a
> many-MR application Crunch pipeline like this? Given Crunch makes it so
> easy to write these, I suspect others would hit the same issues.
>
> Would switching from MapReduce to Spark likely be a big win? My uninformed
> impression is that Spark might require fewer disk operations, though I
> don't see how it could avoid more cross-machine shuffles given our joins.
>
> Thanks,
> Everett
>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.




-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>