You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Scott Carey (JIRA)" <ji...@apache.org> on 2009/06/10 20:01:07 UTC

[jira] Commented: (HADOOP-5223) Refactor reduce shuffle code

    [ https://issues.apache.org/jira/browse/HADOOP-5223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12718153#action_12718153 ] 

Scott Carey commented on HADOOP-5223:
-------------------------------------

I've had issues with shuffle on 0.19, on a cluster with new hardware capable of running 13+ maps with 11+ reduces concurrently (dual quad core w/ hyperthreading = 16 hardware threads, 24GB RAM, 4 drives), shuffle is always my bottleneck on any job where the maps aren't huge or they condense data down significantly before the reduce.   During this bottleneck, disk, network, and CPU are calm.  I collected quite a few trhead dumps in this state on many different jobs.  Increasing parallel copies and tasktracker http threads had no effect.  For the most part, the thread dumps always had the shuffle fetch threads idle, and the main thread here:

"main" prio=10 tid=0x000000005eed3800 nid=0x62a2 waiting on condition [0x0000000040eb4000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.getMapCompletionEvents(ReduceTask.java:2244)
	at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.fetchOutputs(ReduceTask.java:1716)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:368)
	at org.apache.hadoop.mapred.Child.main(Child.java:158)


There is a one line fix.   This JIRA is refacoring the code that this fix would apply to however, so i'll post my info here to make sure the improvements are contained in it.
The fix improves times by 40% on a composite set of jobs (a cascading flow consisting of 25+ map/reduce jobs, with up to 7 concurrent ones).  

First, the fix I made is the below:

Comment out or delete the line:
{code} break; //we have a map from this host {code}
in ReduceOutput.java in ReduceCopier.fetchOutputs()
-- line 1804 in 0.19.2-dev, 1911 on 0.20.1-dev and line 1954 currently on trunk. 

Reduces that took 1 to 2 minutes while copying little data in the shuffle phase on average on my set of jobs now take 1 to 3 seconds on average in the shuffle.

Here is the problem.  The shuffle is currently limiting itself to only copy one shard from a single host per pass, and then sleeping.  As long as the number of map shards is much more than the number of hosts, this requires quite a few sleep delays.  For servers that can handle many map and reduce tasks each, this gets out of hand quickly, especially on small or medium sized clusters where the ideal concurrent shuffle copies per reduce is on the order of, or larger than, the number of hosts.
  
A more sophisticated fix such as this JIRA will do more, but the low hanging fruit performance fix is to get every shard that is reported from the last ping before sleeping again and checking for more.  This not only improves the shuffle speed, but reduces the total number of pings to find out what shards are available which reduces load elsewhere.  It makes little sense to do what happens now on a small cluster:
Discover say, 100 shards are needed to be fetched, grab 8 of them, then sleep, ask the again what are available, grab only 8, sleep ...
At the very least, if there are 100 map outputs available to a reducer, it should keep draining from this list before sleeping and asking for an updated set.

Some may object to opening more than one concurrent connection to a host on the basis that it could overload a tasktracker -- but this would seem like a false assumption to me.  First, tasktrackers throttle this with the configuration parameter for number http threads.  Second, reduces throttle this with the number of concurrent shuffle fetch threads.   There is no difference between a reduce opening 10 concurrent shuffle threads to 10 hosts and 10 to one host, when all reduces are concurrently doing this and randomly choosing hosts the average number of concurrent connections on one TT will remain the same.    
If it is a serious concern for other reasons (the 'penalty box'? or other error handling?) then the shuffle queue could be filled in a better order than one host at a time, or at least not sleep and re-fetch the list without first draining it.  A more significant refactor may do better than the one liner -- but I suspect this alone is most of the performance gain.


Here is a sample log before and after the change on 0.19 with a small dev cluster with newer hardware -- a particularly bad case for this:
3 TT's, each configured for 13 concurrent maps, 11 concurrent reduces, 10 concurrent shuffle copies, 40 TT http threads:

Before: {code}
2009-06-09 22:13:53,657 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Need another 51 map output(s) where 0 is already in progress
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082206_0006_m_000050_0'
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 51 new map-outputs
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0: Got 1 obsolete map-outputs from tasktracker 
2009-06-09 22:13:53,672 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts)
2009-06-09 22:13:53,689 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 54394 bytes (54398 raw bytes) into RAM from attempt_200906082206_0006_m_000014_0
2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 70736 bytes (70740 raw bytes) into RAM from attempt_200906082206_0006_m_000003_0
2009-06-09 22:13:53,690 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 73540 bytes (73544 raw bytes) into RAM from attempt_200906082206_0006_m_000001_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 54394 bytes from map-output for attempt_200906082206_0006_m_000014_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 70736 bytes from map-output for attempt_200906082206_0006_m_000003_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Read 73540 bytes from map-output for attempt_200906082206_0006_m_000001_0
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000014_0 -> (21, 205) from 10.3.0.142
2009-06-09 22:13:53,692 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000003_0 -> (21, 240) from 10.3.0.143
2009-06-09 22:13:53,693 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000001_0 -> (21, 204) from 10.3.0.141
2009-06-09 22:13:55,662 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082206_0006_r_000004_0 Scheduled 3 outputs (0 slow hosts and0 dup hosts)
  
 -- SNIP --

2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Read 79913 bytes from map-output for attempt_200906082206_0006_m_000042_0
2009-06-09 22:14:49,753 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082206_0006_m_000042_0 -> (21, 237) from 10.3.0.141
2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
2009-06-09 22:14:50,751 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
2009-06-09 22:14:50,752 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 51 files left.
2009-06-09 22:14:50,813 INFO org.apache.hadoop.mapred.Merger: Merging 51 sorted segments
2009-06-09 22:14:50,817 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 51 segments left of total size: 3325252 bytes
{code}

After  -- (slightly different job): {code}
2009-06-08 23:51:07,057 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Thread waiting: Thread for merging on-disk files
2009-06-08 23:51:07,058 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Need another 68 map output(s) where 0 is already in progress
2009-06-08 23:51:07,069 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000060_1'
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: Ignoring obsolete output of KILLED map-task: 'attempt_200906082336_0014_m_000014_0'
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 68 new map-outputs
2009-06-08 23:51:07,070 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0: Got 2 obsolete map-outputs from tasktracker 
2009-06-08 23:51:07,071 INFO org.apache.hadoop.mapred.ReduceTask: attempt_200906082336_0014_r_000009_0 Scheduled 68 outputs (0 slow hosts and0 dup hosts)
2009-06-08 23:51:07,106 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 674904 bytes (674908 raw bytes) into RAM from attempt_200906082336_0014_m_000005_0
2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Read 674904 bytes from map-output for attempt_200906082336_0014_m_000005_0
2009-06-08 23:51:07,110 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000005_0 -> (61, 26) from 10.3.0.143

-- SNIP --

2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Read 1439739 bytes from map-output for attempt_200906082336_0014_m_000012_1
2009-06-08 23:51:08,389 INFO org.apache.hadoop.mapred.ReduceTask: Rec #1 from attempt_200906082336_0014_m_000012_1 -> (50, 25) from 10.3.0.141
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
2009-06-08 23:51:09,064 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 68 files left.
2009-06-08 23:51:09,122 INFO org.apache.hadoop.mapred.Merger: Merging 68 sorted segments
2009-06-08 23:51:09,126 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 52 segments left of total size: 44450255 bytes
{code}


> Refactor reduce shuffle code
> ----------------------------
>
>                 Key: HADOOP-5223
>                 URL: https://issues.apache.org/jira/browse/HADOOP-5223
>             Project: Hadoop Core
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Owen O'Malley
>            Assignee: Owen O'Malley
>             Fix For: 0.21.0
>
>         Attachments: HADOOP-5233_api.patch, HADOOP-5233_part0.patch
>
>
> The reduce shuffle code has become very complex and entangled. I think we should move it out of ReduceTask and into a separate package (org.apache.hadoop.mapred.task.reduce). Details to follow.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.