You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Thomas Graves (JIRA)" <ji...@apache.org> on 2016/03/21 20:28:25 UTC

[jira] [Comment Edited] (SPARK-1239) Don't fetch all map output statuses at each reducer during shuffles

    [ https://issues.apache.org/jira/browse/SPARK-1239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204900#comment-15204900 ] 

Thomas Graves edited comment on SPARK-1239 at 3/21/16 7:27 PM:
---------------------------------------------------------------

So I have been looking at this and testing a few changes out.

There are a few issues here but if we are looking at solving the driver memory bloat issue then I think this comes down to flow control issue.  The Driver is trying to respond to all the map status requests and is shoving them out to Netty quicker then netty can send them and we end up using a lot of memory very quickly.  Yes you can try to reduce the size of the MapStatuses but you can only do that to a point and you could still have the this issue.

There are multiple possible ways to solve this.  The approach I have been looking at is having the MapOutputTracker have its own queue and thread pool for handling requests.  This gives us the flexibility to do multiple things:

- We can make the reply synchronous (ie it waits for response from netty to start next reply) without blocking the normal dispatcher threads which do things like handling heartbeats, thus giving us flow control. We can decide to do this only if the map output status are above a certain size or do it all the time.  You can adjust the thread pool size to handle more in parallel.  you could make this more sophisticated in the future if we want to have some sort of send queue rather then blocking each thread.  
- We can easily synchronize incoming requests without blocking dispatcher threads so we don't serialize the same MapStatus multiple times.  Background - one other problem I've been seeing is that you get a bunch of requests for map status in at once, we have a lot of dispatchers threads running in parallel, all of those do the check to see if the map status is cached, all of them report its not, and you have multiple threads all serializing the exact same map output statuses.
- doesn't limit us with sending map status with Task data.  ie if we want to change Spark in the future to start Reducer tasks before all map tasks finish (MapReduce does this now) this more easily works with that.  
 
I still need to do some more testing on this but I wanted to see what people thought of this approach?

What I have implemented right now is a queue and threadpool in the MapOutputTracker to handle the requests.  if its over 5MB (still deciding on this size) then when it replies it waits for it to actually send before grabbing the next request.
For the second bullet above I did a somewhat simpler approach for now and when registerMapOutputs is called I have it cache the map status output then instead of waiting for a request to come in. This helps as it will make sure the last one is cached but if you have multiple then the others still won't be in the cache.  We could either have it cache more or take an approach like I mention above to have it just synchronize and cache one upon the first request.

One of the large jobs I'm using to test this is shuffling 15TB of data using 202000 map tasks going down to 500 reducers.  The driver originally was using 20GB of memory, with my changes I was able to successfully run it with 5GB.
The job has 50mb of serialized map output statuses and before my changes it took executors 40-70 seconds to fetch the map output status, with my change using 8 threads it took roughly the same.  I need to get some more exact statistics here.


was (Author: tgraves):
So I have been looking at this and testing a few changes out.

There are a few issues here but if we are looking at solving the driver memory bloat issue then I think this comes down to flow control issue.  The Driver is trying to respond to all the map status requests and is shoving them out to Netty quicker then netty can send them and we end up using a lot of memory very quickly.  Yes you can try to reduce the size of the MapStatuses but you can only do that to a point and you could still have the this issue.

There are multiple possible ways to solve this.  The approach I have been looking at is having the MapOutputTracker have its own queue and thread pool for handling requests.  This gives us the flexibility to do multiple things:

- We can make the reply synchronous (ie it waits for response from netty to start next reply) without blocking the normal dispatcher threads which do things like handling heartbeats, thus giving us flow control. We can decide to do this only if the map output status are above a certain size or do it all the time.  You can adjust the thread pool size to handle more in parallel.  you could make this more sophisticated in the future if we want to have some sort of send queue rather then blocking each thread.  
- We can easily synchronize incoming requests without blocking dispatcher threads so we don't serialize the same MapStatus multiple times.  Background - one other problem I've been seeing is that you get a bunch of requests for map status in at once, we have a lot of dispatchers threads running in parallel, all of those do the check to see if the map status is cached, all of them report its not, and you have multiple threads all serializing the exact same map output statuses.
- doesn't limit us with sending map status with Task data.  ie if we want to change Spark in the future to start Reducer tasks before all map tasks finish (MapReduce does this now) this more easily works with that.  
 
I still need to do some more testing on this but I wanted to see what people thought of this approach?

What I have implemented right now is a queue and threadpool in the MapOutputTracker to handle the requests.  if its over 5MB (still deciding on this size) then when it replies it waits for it to actually send before grabbing the next request.
For the second bullet above I did a somewhat simpler approach for now and when registerMapOutputs is called I have it cache the map status output then instead of waiting for a request to come in. This helps as it will make sure the last one is cached but if you have multiple then the others still won't be in the cache.  We could either have it cache more or take an approach like I mention above to have it just synchronize and cache one upon the first request.

One of the large jobs I'm using to test this is shuffling 15TB of data using 202000 map tasks going down to 500 reducers.  The driver originally was using 20GB of memory, with my changes I was able to successfully run it with 5GB.

> Don't fetch all map output statuses at each reducer during shuffles
> -------------------------------------------------------------------
>
>                 Key: SPARK-1239
>                 URL: https://issues.apache.org/jira/browse/SPARK-1239
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 1.0.2, 1.1.0
>            Reporter: Patrick Wendell
>            Assignee: Thomas Graves
>
> Instead we should modify the way we fetch map output statuses to take both a mapper and a reducer - or we should just piggyback the statuses on each task. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org