You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Chris Douglas (JIRA)" <ji...@apache.org> on 2008/03/01 03:03:51 UTC

[jira] Updated: (HADOOP-2919) Create fewer copies of buffer data during sort/spill

     [ https://issues.apache.org/jira/browse/HADOOP-2919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2919:
----------------------------------

    Attachment: 2919-0.patch

This patch effects the following changes to improve our efficiency in this area. Instead of gradually growing our buffers, we use properties to determine the size of the K,V byte buffer and accounting data and allocate it up front. We maintain accounting information for the task as two arrays of ints (rather than separate arrays for each partition), mimicking the existing BufferSorter interface. The first stores offsets into the second, which maintains the k/v offsets and partition information for the keys. This permits us to swap offsets to effect the sort, as is presently implemented in BufferSorter, but without requiring us to wrap them in IntWritables.

{noformat}
kvoffset buffer        kvindices buffer
 _____________         _________________
|offset k1,v1 |       | partition k1,v1 |
|offset k1,v2 |       | k1 offset       |
     ...              | v1 offset       |
|offset kn,vn |       | partition k2,v2 |
                      | k2 offset       |
                      | v2 offset       |
                             ...
                      | partition kn,vn |
                      | kn offset       |
                      | vn offset       |
{noformat}

By default, the total size of the accounting space is 5% of io.sort.mb. We build on the work done in HADOOP-1965, but rather than using 50% of io.sort.mb before a spill, we set a "soft" limit that defaults to 80% of the number of records or 80% of the K,V buffer before starting a spill thread. Note that this limit does not require us to query each partition collector for its memory usage, but can be effected by examining our indices. Rather than permitting the spill thread to "own" references to the buffers, we maintain a set of indices into the offset and k,v byte buffers defining the area of each in which the spill buffer is permitted to work. According to the Java VM spec, we can assume that reading/writing array elements does not require a lock on the array.

We maintain three indices for both the accounting and k,v buffers: start, end, and index. The area between start and end is available to the spill, while the area between end and index (in truth, a marker noting end of the last record written) contains "spillable" data yet to be written to disk. If the soft limit is reached- or if one attempts a write into the buffer that is too large to accommodate without a spill- then the task thread sets the end index to the last record marker and triggers a spill. While the spill is running, the area between the start and end indices is unavailable for writing from collect(K,V) and the task thread will block until the spill has completed if the index marker hits the start marker.

{noformat}
Buffer indices uring a spill:
 ___________      ___________      ___________
|___________|    |___________|    |___________|
 ^     ^  ^ ^      ^  ^  ^  ^      ^  ^ ^   ^
 s     e  i v      i  s  e  v      e  i s   v
{noformat}

It is worth mentioning that each key must be contiguous to be used with a RawComparator, but values can wrap around the end of the buffer. This requires us to note the "voided" space in the buffer that contains no data. When the spill completes, it sets the start marker to the end marker, making that space available for writing. Note that it must also reset the void marker to the buffer size if the spill wraps around the end of the buffer (the rightmost case in the preceding figure). The "voided" marker is owned by whichever thread needs to manipulate it, so we require no special locking for it.

When we sort, we sort all spill data by partition instead of creating a separate collector for each partition. Further, we can use appendRaw (as was suggested in HADOOP-1609) to write our serialized data directly from the k,v buffer to our spill file writer instead of deserializing each prior to the write. Note that for record-compressed data (when not using a combiner), this permits us to store compressed values in our k,v buffer.

The attached patch is a work in progress, and is known to suffer from the following deficiencies:
* Very large keys and values (with a comparably small io.sort.mb) present a difficult problem for a statically allocated collection buffer. If a series of writes to an empty collection exceed the space allocated to the k,v byte buffer (e.g. a 100MB k,v byte buffer and a Writable that attempts 2 51MB write(byte[],int,int) calls), the current patch will loop forever. This will also happen for separate writes. The current patch only spills when the soft limit is reached.
* Handling of compression is inelegantly implemented. Again, this is a work in progress and will be cleaned up.
* The spill thread is created each time it is invoked, but it need not be.
* The code managing the contiguous key property is not as efficient as it could be.
* The implementation of QuickSort could be improved (re: Sedgewick) to handle the case where keys are equal to the pivot, probably a fairly common case.

> Create fewer copies of buffer data during sort/spill
> ----------------------------------------------------
>
>                 Key: HADOOP-2919
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2919
>             Project: Hadoop Core
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Chris Douglas
>         Attachments: 2919-0.patch
>
>
> Currently, the sort/spill works as follows:
> Let r be the number of partitions
> For each call to collect(K,V) from map:
> * If buffers do not exist, allocate a new DataOutputBuffer to collect K,V bytes, allocate r buffers for collecting K,V offsets
> * Write K,V into buffer, noting offsets
> * Register offsets with associated partition buffer, allocating/copying accounting buffers if nesc
> * Calculate the total mem usage for buffer and all partition collectors by iterating over the collectors
> * If total mem usage is greater than half of io.sort.mb, then start a new thread to spill, blocking if another spill is in progress
> For each spill (assuming no combiner):
> * Save references to our K,V byte buffer and accounting data, setting the former to null (will be recreated on the next call to collect(K,V))
> * Open a SequenceFile.Writer for this partition
> * Sort each partition separately (the current version of sort reuses, but still requires wrapping, indices in IntWritable objects)
> * Build a RawKeyValueIterator of sorted data for the partition
> * Deserialize each key and value and call SequenceFile::append(K,V) on the writer for this partition
> There are a number of opportunities for reducing the number of copies, creations, and operations we perform in this stage, particularly since growing many of the buffers involved requires that we copy the existing data to the newly sized allocation.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.