You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by elton sky <el...@gmail.com> on 2010/06/19 05:14:49 UTC

How does MergeQueue.merge actually sort from different segments ??

Hello everyone,

I am going thru source code of MapReduce. In MergeQueue.merge, I can only
see the SEGMENTS are combined and sorted by length into a list for merge.
However, I could not find the procedure to sort those (key, value) in
segments by key...

here is the function:

   1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
   2. .
   3. .
   4. .
   5.
   6.         //if we have lesser number of segments remaining, then just
   return the
   7.         //iterator, else do another single level merge
   8.         if (numSegments <= factor) {
   9.           // Reset totalBytesProcessed to track the progress of the
   final merge.
   10.           // This is considered the progress of the reducePhase, the
   3rd phase
   11.           // of reduce task. Currently totalBytesProcessed is not
   used in sort
   12.           // phase of reduce task(i.e. when intermediate merges
   happen).
   13.           totalBytesProcessed = startBytes;
   14.
   15.           //calculate the length of the remaining segments. Required
   for
   16.           //calculating the merge progress
   17.           long totalBytes = 0;
   18.           for (int i = 0; i < segmentsToMerge.size(); i++) {
   19.             totalBytes += segmentsToMerge.get(i).getLength();
   20.           }
   21.           if (totalBytes != 0) //being paranoid
   22.             progPerByte = 1.0f / (float)totalBytes;
   23.
   24.           if (totalBytes != 0)
   25.             mergeProgress.set(totalBytesProcessed * progPerByte);
   26.           else
   27.             mergeProgress.set(1.0f); // Last pass and no segments
   left - we're done
   28.
   29.           LOG.info("Down to the last merge-pass, with " + numSegments
   +
   30.                    " segments left of total size: " + totalBytes + "
   bytes");
   31.           return this;
   32.         } else {
   33.           LOG.info("Merging " + segmentsToMerge.size() +
   34.                    " intermediate segments out of a total of " +
   35.                    (segments.size()+segmentsToMerge.size()));
   36.
   37.           //we want to spread the creation of temp files on multiple
   disks if
   38.           //available under the space constraints
   39.           long approxOutputSize = 0;
   40.           for (Segment<K, V> s : segmentsToMerge) {
   41.             approxOutputSize += s.getLength() +
   42.
      ChecksumFileSystem.getApproxChkSumLength(
   43.                                 s.getLength());
   44.           }
   45.           Path tmpFilename =
   46.             new Path(tmpDir, "intermediate").suffix("." + passNo);
   47.
   48.           Path outputFile =  lDirAlloc.getLocalPathForWrite(
   49.                                               tmpFilename.toString(),
   50.                                               approxOutputSize,
   conf);
   51.
   52.           Writer<K, V> writer =
   53.             new Writer<K, V>(conf, fs, outputFile, keyClass,
   valueClass, codec,
   54.                              writesCounter);
   55.           *writeFile(this, writer, reporter, conf);*
   56.           writer.close();
   57.
   58.           //we finished one single level merge; now clean up the
   priority
   59.           //queue
   60.           this.close();
   61.
   62.           // Add the newly create segment to the list of segments to
   be merged
   63.           Segment<K, V> tempSegment =
   64.             new Segment<K, V>(conf, fs, outputFile, codec, false);
   65.           segments.add(tempSegment);
   66.           numSegments = segments.size();
   67.           Collections.sort(segments, segmentComparator);
   68.
   69.           passNo++;
   70.         }
   71.         //we are worried about only the first pass merge factor. So
   reset the
   72.         //factor to what it originally was
   73.         factor = origFactor;
   74.       } while(true);
   75.     }

I can see if number of segments is less than factor, segments are
returned(is this right?). Otherwise, factor number of segments will be
merged pass by pass. But how those <K,V> in different segments get sort in
order ?

Elton

Re: How does MergeQueue.merge actually sort from different segments ??

Posted by elton sky <el...@gmail.com>.
Thanks for reply Yu,
But as I know PriorityQueue, the bottomUp and topDown functions, is used to
sort Segments list by their length, not for <K, V> inside a Segment. Are
these 2 actually the same thing? I am quite confused, can you give a little
more detail?

Thanx

On Mon, Jun 21, 2010 at 1:04 AM, Ted Yu <yu...@gmail.com> wrote:

> Please note this:
>
> private static class MergeQueue<K extends Object, V extends Object>
>  extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
>
> priority queue is used to accomplish sorting.
>
> On Fri, Jun 18, 2010 at 8:14 PM, elton sky <el...@gmail.com> wrote:
>
> > Hello everyone,
> >
> > I am going thru source code of MapReduce. In MergeQueue.merge, I can only
> > see the SEGMENTS are combined and sorted by length into a list for merge.
> > However, I could not find the procedure to sort those (key, value) in
> > segments by key...
> >
> > here is the function:
> >
> >   1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
> >   2. .
> >   3. .
> >   4. .
> >   5.
> >   6.         //if we have lesser number of segments remaining, then just
> >   return the
> >   7.         //iterator, else do another single level merge
> >   8.         if (numSegments <= factor) {
> >   9.           // Reset totalBytesProcessed to track the progress of the
> >   final merge.
> >   10.           // This is considered the progress of the reducePhase,
> the
> >   3rd phase
> >   11.           // of reduce task. Currently totalBytesProcessed is not
> >   used in sort
> >   12.           // phase of reduce task(i.e. when intermediate merges
> >   happen).
> >   13.           totalBytesProcessed = startBytes;
> >   14.
> >   15.           //calculate the length of the remaining segments.
> Required
> >   for
> >   16.           //calculating the merge progress
> >   17.           long totalBytes = 0;
> >   18.           for (int i = 0; i < segmentsToMerge.size(); i++) {
> >   19.             totalBytes += segmentsToMerge.get(i).getLength();
> >   20.           }
> >   21.           if (totalBytes != 0) //being paranoid
> >   22.             progPerByte = 1.0f / (float)totalBytes;
> >   23.
> >   24.           if (totalBytes != 0)
> >   25.             mergeProgress.set(totalBytesProcessed * progPerByte);
> >   26.           else
> >   27.             mergeProgress.set(1.0f); // Last pass and no segments
> >   left - we're done
> >   28.
> >   29.           LOG.info("Down to the last merge-pass, with " +
> numSegments
> >   +
> >   30.                    " segments left of total size: " + totalBytes +
> "
> >   bytes");
> >   31.           return this;
> >   32.         } else {
> >   33.           LOG.info("Merging " + segmentsToMerge.size() +
> >   34.                    " intermediate segments out of a total of " +
> >   35.                    (segments.size()+segmentsToMerge.size()));
> >   36.
> >   37.           //we want to spread the creation of temp files on
> multiple
> >   disks if
> >   38.           //available under the space constraints
> >   39.           long approxOutputSize = 0;
> >   40.           for (Segment<K, V> s : segmentsToMerge) {
> >   41.             approxOutputSize += s.getLength() +
> >   42.
> >      ChecksumFileSystem.getApproxChkSumLength(
> >   43.                                 s.getLength());
> >   44.           }
> >   45.           Path tmpFilename =
> >   46.             new Path(tmpDir, "intermediate").suffix("." + passNo);
> >   47.
> >   48.           Path outputFile =  lDirAlloc.getLocalPathForWrite(
> >   49.
> tmpFilename.toString(),
> >   50.                                               approxOutputSize,
> >   conf);
> >   51.
> >   52.           Writer<K, V> writer =
> >   53.             new Writer<K, V>(conf, fs, outputFile, keyClass,
> >   valueClass, codec,
> >   54.                              writesCounter);
> >   55.           *writeFile(this, writer, reporter, conf);*
> >   56.           writer.close();
> >   57.
> >   58.           //we finished one single level merge; now clean up the
> >   priority
> >   59.           //queue
> >   60.           this.close();
> >   61.
> >   62.           // Add the newly create segment to the list of segments
> to
> >   be merged
> >   63.           Segment<K, V> tempSegment =
> >   64.             new Segment<K, V>(conf, fs, outputFile, codec, false);
> >   65.           segments.add(tempSegment);
> >   66.           numSegments = segments.size();
> >   67.           Collections.sort(segments, segmentComparator);
> >   68.
> >   69.           passNo++;
> >   70.         }
> >   71.         //we are worried about only the first pass merge factor. So
> >   reset the
> >   72.         //factor to what it originally was
> >   73.         factor = origFactor;
> >   74.       } while(true);
> >   75.     }
> >
> > I can see if number of segments is less than factor, segments are
> > returned(is this right?). Otherwise, factor number of segments will be
> > merged pass by pass. But how those <K,V> in different segments get sort
> in
> > order ?
> >
> > Elton
> >
>

Re: How does MergeQueue.merge actually sort from different segments ??

Posted by Ted Yu <yu...@gmail.com>.
Please note this:

private static class MergeQueue<K extends Object, V extends Object>
  extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {

priority queue is used to accomplish sorting.

On Fri, Jun 18, 2010 at 8:14 PM, elton sky <el...@gmail.com> wrote:

> Hello everyone,
>
> I am going thru source code of MapReduce. In MergeQueue.merge, I can only
> see the SEGMENTS are combined and sorted by length into a list for merge.
> However, I could not find the procedure to sort those (key, value) in
> segments by key...
>
> here is the function:
>
>   1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
>   2. .
>   3. .
>   4. .
>   5.
>   6.         //if we have lesser number of segments remaining, then just
>   return the
>   7.         //iterator, else do another single level merge
>   8.         if (numSegments <= factor) {
>   9.           // Reset totalBytesProcessed to track the progress of the
>   final merge.
>   10.           // This is considered the progress of the reducePhase, the
>   3rd phase
>   11.           // of reduce task. Currently totalBytesProcessed is not
>   used in sort
>   12.           // phase of reduce task(i.e. when intermediate merges
>   happen).
>   13.           totalBytesProcessed = startBytes;
>   14.
>   15.           //calculate the length of the remaining segments. Required
>   for
>   16.           //calculating the merge progress
>   17.           long totalBytes = 0;
>   18.           for (int i = 0; i < segmentsToMerge.size(); i++) {
>   19.             totalBytes += segmentsToMerge.get(i).getLength();
>   20.           }
>   21.           if (totalBytes != 0) //being paranoid
>   22.             progPerByte = 1.0f / (float)totalBytes;
>   23.
>   24.           if (totalBytes != 0)
>   25.             mergeProgress.set(totalBytesProcessed * progPerByte);
>   26.           else
>   27.             mergeProgress.set(1.0f); // Last pass and no segments
>   left - we're done
>   28.
>   29.           LOG.info("Down to the last merge-pass, with " + numSegments
>   +
>   30.                    " segments left of total size: " + totalBytes + "
>   bytes");
>   31.           return this;
>   32.         } else {
>   33.           LOG.info("Merging " + segmentsToMerge.size() +
>   34.                    " intermediate segments out of a total of " +
>   35.                    (segments.size()+segmentsToMerge.size()));
>   36.
>   37.           //we want to spread the creation of temp files on multiple
>   disks if
>   38.           //available under the space constraints
>   39.           long approxOutputSize = 0;
>   40.           for (Segment<K, V> s : segmentsToMerge) {
>   41.             approxOutputSize += s.getLength() +
>   42.
>      ChecksumFileSystem.getApproxChkSumLength(
>   43.                                 s.getLength());
>   44.           }
>   45.           Path tmpFilename =
>   46.             new Path(tmpDir, "intermediate").suffix("." + passNo);
>   47.
>   48.           Path outputFile =  lDirAlloc.getLocalPathForWrite(
>   49.                                               tmpFilename.toString(),
>   50.                                               approxOutputSize,
>   conf);
>   51.
>   52.           Writer<K, V> writer =
>   53.             new Writer<K, V>(conf, fs, outputFile, keyClass,
>   valueClass, codec,
>   54.                              writesCounter);
>   55.           *writeFile(this, writer, reporter, conf);*
>   56.           writer.close();
>   57.
>   58.           //we finished one single level merge; now clean up the
>   priority
>   59.           //queue
>   60.           this.close();
>   61.
>   62.           // Add the newly create segment to the list of segments to
>   be merged
>   63.           Segment<K, V> tempSegment =
>   64.             new Segment<K, V>(conf, fs, outputFile, codec, false);
>   65.           segments.add(tempSegment);
>   66.           numSegments = segments.size();
>   67.           Collections.sort(segments, segmentComparator);
>   68.
>   69.           passNo++;
>   70.         }
>   71.         //we are worried about only the first pass merge factor. So
>   reset the
>   72.         //factor to what it originally was
>   73.         factor = origFactor;
>   74.       } while(true);
>   75.     }
>
> I can see if number of segments is less than factor, segments are
> returned(is this right?). Otherwise, factor number of segments will be
> merged pass by pass. But how those <K,V> in different segments get sort in
> order ?
>
> Elton
>