You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Shixiong Zhu <zs...@gmail.com> on 2014/11/21 04:14:09 UTC

Re: Eliminate copy while sending data : any Akka experts here ?

Is it possible that Spark buffers the messages
of mapOutputStatuses(Array[Byte]) according to the size
of mapOutputStatuses which have already sent but not yet ACKed? The buffer
will be cheap since the mapOutputStatuses messages are same and the memory
cost is only a few pointers.

Best Regards,
Shixiong Zhu

2014-09-20 16:24 GMT+08:00 Reynold Xin <rx...@databricks.com>:

> BTW - a partial solution here: https://github.com/apache/spark/pull/2470
>
> This doesn't address the 0 size block problem yet, but makes my large job
> on hundreds of terabytes of data much more reliable.
>
>
> On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan <mr...@gmail.com>
> wrote:
>
> > In our clusters, number of containers we can get is high but memory
> > per container is low : which is why avg_nodes_not_hosting data is
> > rarely zero for ML tasks :-)
> >
> > To update - to unblock our current implementation efforts, we went
> > with broadcast - since it is intutively easier and minimal change; and
> > compress the array as bytes in TaskResult.
> > This is then stored in disk backed maps - to remove memory pressure on
> > master and workers (else MapOutputTracker becomes a memory hog).
> >
> > But I agree, compressed bitmap to represent 'large' blocks (anything
> > larger that maxBytesInFlight actually) and probably existing to track
> > non zero should be fine (we should not really track zero output for
> > reducer - just waste of space).
> >
> >
> > Regards,
> > Mridul
> >
> > On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin <rx...@databricks.com> wrote:
> > > Note that in my original proposal, I was suggesting we could track
> > whether
> > > block size = 0 using a compressed bitmap. That way we can still avoid
> > > requests for zero-sized blocks.
> > >
> > >
> > >
> > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin <rx...@databricks.com>
> wrote:
> > >
> > >> Yes, that number is likely == 0 in any real workload ...
> > >>
> > >>
> > >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan <mridul@gmail.com
> >
> > >> wrote:
> > >>
> > >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin <rx...@databricks.com>
> > wrote:
> > >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan <
> > mridul@gmail.com>
> > >>> > wrote:
> > >>> >
> > >>> >>
> > >>> >> >
> > >>> >> > The other thing we do need is the location of blocks. This is
> > >>> actually
> > >>> >> just
> > >>> >> > O(n) because we just need to know where the map was run.
> > >>> >>
> > >>> >> For well partitioned data, wont this not involve a lot of unwanted
> > >>> >> requests to nodes which are not hosting data for a reducer (and
> lack
> > >>> >> of ability to throttle).
> > >>> >>
> > >>> >
> > >>> > Was that a question? (I'm guessing it is). What do you mean
> exactly?
> > >>>
> > >>>
> > >>> I was not sure if I understood the proposal correctly - hence the
> > >>> query : if I understood it right - the number of wasted requests goes
> > >>> up by num_reducers * avg_nodes_not_hosting data.
> > >>>
> > >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !
> > >>>
> > >>> Regards,
> > >>> Mridul
> > >>>
> > >>
> > >>
> >
>

Re: Eliminate copy while sending data : any Akka experts here ?

Posted by Shixiong Zhu <zs...@gmail.com>.
MapOutputTrackerMasterActor sends the `mapOutputStatuses` to a buffer at
first. The messages in this buffer will be sent by some background threads.
In these threads, they will check if there are already too many messages
sent to Akka. If so, they will wait until there is enough memory.

I put a commit for this idea here:
https://github.com/zsxwing/spark/commit/c998856cdf747aa0452d030e58c3c2dd4ef7f97d

Best Regards,
Shixiong Zhu

2014-11-21 12:28 GMT+08:00 Reynold Xin <rx...@databricks.com>:

> Can you elaborate? Not 100% sure if I understand what you mean.
>
> On Thu, Nov 20, 2014 at 7:14 PM, Shixiong Zhu <zs...@gmail.com> wrote:
>
>> Is it possible that Spark buffers the messages
>> of mapOutputStatuses(Array[Byte]) according to the size
>> of mapOutputStatuses which have already sent but not yet ACKed? The buffer
>> will be cheap since the mapOutputStatuses messages are same and the memory
>> cost is only a few pointers.
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2014-09-20 16:24 GMT+08:00 Reynold Xin <rx...@databricks.com>:
>>
>>> BTW - a partial solution here: https://github.com/apache/spark/pull/2470
>>>
>>> This doesn't address the 0 size block problem yet, but makes my large job
>>> on hundreds of terabytes of data much more reliable.
>>>
>>>
>>> On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan <mr...@gmail.com>
>>> wrote:
>>>
>>> > In our clusters, number of containers we can get is high but memory
>>> > per container is low : which is why avg_nodes_not_hosting data is
>>> > rarely zero for ML tasks :-)
>>> >
>>> > To update - to unblock our current implementation efforts, we went
>>> > with broadcast - since it is intutively easier and minimal change; and
>>> > compress the array as bytes in TaskResult.
>>> > This is then stored in disk backed maps - to remove memory pressure on
>>> > master and workers (else MapOutputTracker becomes a memory hog).
>>> >
>>> > But I agree, compressed bitmap to represent 'large' blocks (anything
>>> > larger that maxBytesInFlight actually) and probably existing to track
>>> > non zero should be fine (we should not really track zero output for
>>> > reducer - just waste of space).
>>> >
>>> >
>>> > Regards,
>>> > Mridul
>>> >
>>> > On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>> > > Note that in my original proposal, I was suggesting we could track
>>> > whether
>>> > > block size = 0 using a compressed bitmap. That way we can still avoid
>>> > > requests for zero-sized blocks.
>>> > >
>>> > >
>>> > >
>>> > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>> > >
>>> > >> Yes, that number is likely == 0 in any real workload ...
>>> > >>
>>> > >>
>>> > >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan <
>>> mridul@gmail.com>
>>> > >> wrote:
>>> > >>
>>> > >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin <rx...@databricks.com>
>>> > wrote:
>>> > >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan <
>>> > mridul@gmail.com>
>>> > >>> > wrote:
>>> > >>> >
>>> > >>> >>
>>> > >>> >> >
>>> > >>> >> > The other thing we do need is the location of blocks. This is
>>> > >>> actually
>>> > >>> >> just
>>> > >>> >> > O(n) because we just need to know where the map was run.
>>> > >>> >>
>>> > >>> >> For well partitioned data, wont this not involve a lot of
>>> unwanted
>>> > >>> >> requests to nodes which are not hosting data for a reducer (and
>>> lack
>>> > >>> >> of ability to throttle).
>>> > >>> >>
>>> > >>> >
>>> > >>> > Was that a question? (I'm guessing it is). What do you mean
>>> exactly?
>>> > >>>
>>> > >>>
>>> > >>> I was not sure if I understood the proposal correctly - hence the
>>> > >>> query : if I understood it right - the number of wasted requests
>>> goes
>>> > >>> up by num_reducers * avg_nodes_not_hosting data.
>>> > >>>
>>> > >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !
>>> > >>>
>>> > >>> Regards,
>>> > >>> Mridul
>>> > >>>
>>> > >>
>>> > >>
>>> >
>>>
>>
>>
>

Re: Eliminate copy while sending data : any Akka experts here ?

Posted by Reynold Xin <rx...@databricks.com>.
Can you elaborate? Not 100% sure if I understand what you mean.

On Thu, Nov 20, 2014 at 7:14 PM, Shixiong Zhu <zs...@gmail.com> wrote:

> Is it possible that Spark buffers the messages
> of mapOutputStatuses(Array[Byte]) according to the size
> of mapOutputStatuses which have already sent but not yet ACKed? The buffer
> will be cheap since the mapOutputStatuses messages are same and the memory
> cost is only a few pointers.
>
> Best Regards,
> Shixiong Zhu
>
> 2014-09-20 16:24 GMT+08:00 Reynold Xin <rx...@databricks.com>:
>
>> BTW - a partial solution here: https://github.com/apache/spark/pull/2470
>>
>> This doesn't address the 0 size block problem yet, but makes my large job
>> on hundreds of terabytes of data much more reliable.
>>
>>
>> On Fri, Jul 4, 2014 at 2:28 AM, Mridul Muralidharan <mr...@gmail.com>
>> wrote:
>>
>> > In our clusters, number of containers we can get is high but memory
>> > per container is low : which is why avg_nodes_not_hosting data is
>> > rarely zero for ML tasks :-)
>> >
>> > To update - to unblock our current implementation efforts, we went
>> > with broadcast - since it is intutively easier and minimal change; and
>> > compress the array as bytes in TaskResult.
>> > This is then stored in disk backed maps - to remove memory pressure on
>> > master and workers (else MapOutputTracker becomes a memory hog).
>> >
>> > But I agree, compressed bitmap to represent 'large' blocks (anything
>> > larger that maxBytesInFlight actually) and probably existing to track
>> > non zero should be fine (we should not really track zero output for
>> > reducer - just waste of space).
>> >
>> >
>> > Regards,
>> > Mridul
>> >
>> > On Fri, Jul 4, 2014 at 3:43 AM, Reynold Xin <rx...@databricks.com>
>> wrote:
>> > > Note that in my original proposal, I was suggesting we could track
>> > whether
>> > > block size = 0 using a compressed bitmap. That way we can still avoid
>> > > requests for zero-sized blocks.
>> > >
>> > >
>> > >
>> > > On Thu, Jul 3, 2014 at 3:12 PM, Reynold Xin <rx...@databricks.com>
>> wrote:
>> > >
>> > >> Yes, that number is likely == 0 in any real workload ...
>> > >>
>> > >>
>> > >> On Thu, Jul 3, 2014 at 8:01 AM, Mridul Muralidharan <
>> mridul@gmail.com>
>> > >> wrote:
>> > >>
>> > >>> On Thu, Jul 3, 2014 at 11:32 AM, Reynold Xin <rx...@databricks.com>
>> > wrote:
>> > >>> > On Wed, Jul 2, 2014 at 3:44 AM, Mridul Muralidharan <
>> > mridul@gmail.com>
>> > >>> > wrote:
>> > >>> >
>> > >>> >>
>> > >>> >> >
>> > >>> >> > The other thing we do need is the location of blocks. This is
>> > >>> actually
>> > >>> >> just
>> > >>> >> > O(n) because we just need to know where the map was run.
>> > >>> >>
>> > >>> >> For well partitioned data, wont this not involve a lot of
>> unwanted
>> > >>> >> requests to nodes which are not hosting data for a reducer (and
>> lack
>> > >>> >> of ability to throttle).
>> > >>> >>
>> > >>> >
>> > >>> > Was that a question? (I'm guessing it is). What do you mean
>> exactly?
>> > >>>
>> > >>>
>> > >>> I was not sure if I understood the proposal correctly - hence the
>> > >>> query : if I understood it right - the number of wasted requests
>> goes
>> > >>> up by num_reducers * avg_nodes_not_hosting data.
>> > >>>
>> > >>> Ofcourse, if avg_nodes_not_hosting data == 0, then we are fine !
>> > >>>
>> > >>> Regards,
>> > >>> Mridul
>> > >>>
>> > >>
>> > >>
>> >
>>
>
>