You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by elton sky <el...@gmail.com> on 2010/06/19 05:14:49 UTC
How does MergeQueue.merge actually sort from different segments
??
Hello everyone,
I am going thru source code of MapReduce. In MergeQueue.merge, I can only
see the SEGMENTS are combined and sorted by length into a list for merge.
However, I could not find the procedure to sort those (key, value) in
segments by key...
here is the function:
1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
2. .
3. .
4. .
5.
6. //if we have lesser number of segments remaining, then just
return the
7. //iterator, else do another single level merge
8. if (numSegments <= factor) {
9. // Reset totalBytesProcessed to track the progress of the
final merge.
10. // This is considered the progress of the reducePhase, the
3rd phase
11. // of reduce task. Currently totalBytesProcessed is not
used in sort
12. // phase of reduce task(i.e. when intermediate merges
happen).
13. totalBytesProcessed = startBytes;
14.
15. //calculate the length of the remaining segments. Required
for
16. //calculating the merge progress
17. long totalBytes = 0;
18. for (int i = 0; i < segmentsToMerge.size(); i++) {
19. totalBytes += segmentsToMerge.get(i).getLength();
20. }
21. if (totalBytes != 0) //being paranoid
22. progPerByte = 1.0f / (float)totalBytes;
23.
24. if (totalBytes != 0)
25. mergeProgress.set(totalBytesProcessed * progPerByte);
26. else
27. mergeProgress.set(1.0f); // Last pass and no segments
left - we're done
28.
29. LOG.info("Down to the last merge-pass, with " + numSegments
+
30. " segments left of total size: " + totalBytes + "
bytes");
31. return this;
32. } else {
33. LOG.info("Merging " + segmentsToMerge.size() +
34. " intermediate segments out of a total of " +
35. (segments.size()+segmentsToMerge.size()));
36.
37. //we want to spread the creation of temp files on multiple
disks if
38. //available under the space constraints
39. long approxOutputSize = 0;
40. for (Segment<K, V> s : segmentsToMerge) {
41. approxOutputSize += s.getLength() +
42.
ChecksumFileSystem.getApproxChkSumLength(
43. s.getLength());
44. }
45. Path tmpFilename =
46. new Path(tmpDir, "intermediate").suffix("." + passNo);
47.
48. Path outputFile = lDirAlloc.getLocalPathForWrite(
49. tmpFilename.toString(),
50. approxOutputSize,
conf);
51.
52. Writer<K, V> writer =
53. new Writer<K, V>(conf, fs, outputFile, keyClass,
valueClass, codec,
54. writesCounter);
55. *writeFile(this, writer, reporter, conf);*
56. writer.close();
57.
58. //we finished one single level merge; now clean up the
priority
59. //queue
60. this.close();
61.
62. // Add the newly create segment to the list of segments to
be merged
63. Segment<K, V> tempSegment =
64. new Segment<K, V>(conf, fs, outputFile, codec, false);
65. segments.add(tempSegment);
66. numSegments = segments.size();
67. Collections.sort(segments, segmentComparator);
68.
69. passNo++;
70. }
71. //we are worried about only the first pass merge factor. So
reset the
72. //factor to what it originally was
73. factor = origFactor;
74. } while(true);
75. }
I can see if number of segments is less than factor, segments are
returned(is this right?). Otherwise, factor number of segments will be
merged pass by pass. But how those <K,V> in different segments get sort in
order ?
Elton
Re: How does MergeQueue.merge actually sort from different
segments ??
Posted by elton sky <el...@gmail.com>.
Thanks for reply Yu,
But as I know PriorityQueue, the bottomUp and topDown functions, is used to
sort Segments list by their length, not for <K, V> inside a Segment. Are
these 2 actually the same thing? I am quite confused, can you give a little
more detail?
Thanx
On Mon, Jun 21, 2010 at 1:04 AM, Ted Yu <yu...@gmail.com> wrote:
> Please note this:
>
> private static class MergeQueue<K extends Object, V extends Object>
> extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
>
> priority queue is used to accomplish sorting.
>
> On Fri, Jun 18, 2010 at 8:14 PM, elton sky <el...@gmail.com> wrote:
>
> > Hello everyone,
> >
> > I am going thru source code of MapReduce. In MergeQueue.merge, I can only
> > see the SEGMENTS are combined and sorted by length into a list for merge.
> > However, I could not find the procedure to sort those (key, value) in
> > segments by key...
> >
> > here is the function:
> >
> > 1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
> > 2. .
> > 3. .
> > 4. .
> > 5.
> > 6. //if we have lesser number of segments remaining, then just
> > return the
> > 7. //iterator, else do another single level merge
> > 8. if (numSegments <= factor) {
> > 9. // Reset totalBytesProcessed to track the progress of the
> > final merge.
> > 10. // This is considered the progress of the reducePhase,
> the
> > 3rd phase
> > 11. // of reduce task. Currently totalBytesProcessed is not
> > used in sort
> > 12. // phase of reduce task(i.e. when intermediate merges
> > happen).
> > 13. totalBytesProcessed = startBytes;
> > 14.
> > 15. //calculate the length of the remaining segments.
> Required
> > for
> > 16. //calculating the merge progress
> > 17. long totalBytes = 0;
> > 18. for (int i = 0; i < segmentsToMerge.size(); i++) {
> > 19. totalBytes += segmentsToMerge.get(i).getLength();
> > 20. }
> > 21. if (totalBytes != 0) //being paranoid
> > 22. progPerByte = 1.0f / (float)totalBytes;
> > 23.
> > 24. if (totalBytes != 0)
> > 25. mergeProgress.set(totalBytesProcessed * progPerByte);
> > 26. else
> > 27. mergeProgress.set(1.0f); // Last pass and no segments
> > left - we're done
> > 28.
> > 29. LOG.info("Down to the last merge-pass, with " +
> numSegments
> > +
> > 30. " segments left of total size: " + totalBytes +
> "
> > bytes");
> > 31. return this;
> > 32. } else {
> > 33. LOG.info("Merging " + segmentsToMerge.size() +
> > 34. " intermediate segments out of a total of " +
> > 35. (segments.size()+segmentsToMerge.size()));
> > 36.
> > 37. //we want to spread the creation of temp files on
> multiple
> > disks if
> > 38. //available under the space constraints
> > 39. long approxOutputSize = 0;
> > 40. for (Segment<K, V> s : segmentsToMerge) {
> > 41. approxOutputSize += s.getLength() +
> > 42.
> > ChecksumFileSystem.getApproxChkSumLength(
> > 43. s.getLength());
> > 44. }
> > 45. Path tmpFilename =
> > 46. new Path(tmpDir, "intermediate").suffix("." + passNo);
> > 47.
> > 48. Path outputFile = lDirAlloc.getLocalPathForWrite(
> > 49.
> tmpFilename.toString(),
> > 50. approxOutputSize,
> > conf);
> > 51.
> > 52. Writer<K, V> writer =
> > 53. new Writer<K, V>(conf, fs, outputFile, keyClass,
> > valueClass, codec,
> > 54. writesCounter);
> > 55. *writeFile(this, writer, reporter, conf);*
> > 56. writer.close();
> > 57.
> > 58. //we finished one single level merge; now clean up the
> > priority
> > 59. //queue
> > 60. this.close();
> > 61.
> > 62. // Add the newly create segment to the list of segments
> to
> > be merged
> > 63. Segment<K, V> tempSegment =
> > 64. new Segment<K, V>(conf, fs, outputFile, codec, false);
> > 65. segments.add(tempSegment);
> > 66. numSegments = segments.size();
> > 67. Collections.sort(segments, segmentComparator);
> > 68.
> > 69. passNo++;
> > 70. }
> > 71. //we are worried about only the first pass merge factor. So
> > reset the
> > 72. //factor to what it originally was
> > 73. factor = origFactor;
> > 74. } while(true);
> > 75. }
> >
> > I can see if number of segments is less than factor, segments are
> > returned(is this right?). Otherwise, factor number of segments will be
> > merged pass by pass. But how those <K,V> in different segments get sort
> in
> > order ?
> >
> > Elton
> >
>
Re: How does MergeQueue.merge actually sort from different
segments ??
Posted by Ted Yu <yu...@gmail.com>.
Please note this:
private static class MergeQueue<K extends Object, V extends Object>
extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
priority queue is used to accomplish sorting.
On Fri, Jun 18, 2010 at 8:14 PM, elton sky <el...@gmail.com> wrote:
> Hello everyone,
>
> I am going thru source code of MapReduce. In MergeQueue.merge, I can only
> see the SEGMENTS are combined and sorted by length into a list for merge.
> However, I could not find the procedure to sort those (key, value) in
> segments by key...
>
> here is the function:
>
> 1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
> 2. .
> 3. .
> 4. .
> 5.
> 6. //if we have lesser number of segments remaining, then just
> return the
> 7. //iterator, else do another single level merge
> 8. if (numSegments <= factor) {
> 9. // Reset totalBytesProcessed to track the progress of the
> final merge.
> 10. // This is considered the progress of the reducePhase, the
> 3rd phase
> 11. // of reduce task. Currently totalBytesProcessed is not
> used in sort
> 12. // phase of reduce task(i.e. when intermediate merges
> happen).
> 13. totalBytesProcessed = startBytes;
> 14.
> 15. //calculate the length of the remaining segments. Required
> for
> 16. //calculating the merge progress
> 17. long totalBytes = 0;
> 18. for (int i = 0; i < segmentsToMerge.size(); i++) {
> 19. totalBytes += segmentsToMerge.get(i).getLength();
> 20. }
> 21. if (totalBytes != 0) //being paranoid
> 22. progPerByte = 1.0f / (float)totalBytes;
> 23.
> 24. if (totalBytes != 0)
> 25. mergeProgress.set(totalBytesProcessed * progPerByte);
> 26. else
> 27. mergeProgress.set(1.0f); // Last pass and no segments
> left - we're done
> 28.
> 29. LOG.info("Down to the last merge-pass, with " + numSegments
> +
> 30. " segments left of total size: " + totalBytes + "
> bytes");
> 31. return this;
> 32. } else {
> 33. LOG.info("Merging " + segmentsToMerge.size() +
> 34. " intermediate segments out of a total of " +
> 35. (segments.size()+segmentsToMerge.size()));
> 36.
> 37. //we want to spread the creation of temp files on multiple
> disks if
> 38. //available under the space constraints
> 39. long approxOutputSize = 0;
> 40. for (Segment<K, V> s : segmentsToMerge) {
> 41. approxOutputSize += s.getLength() +
> 42.
> ChecksumFileSystem.getApproxChkSumLength(
> 43. s.getLength());
> 44. }
> 45. Path tmpFilename =
> 46. new Path(tmpDir, "intermediate").suffix("." + passNo);
> 47.
> 48. Path outputFile = lDirAlloc.getLocalPathForWrite(
> 49. tmpFilename.toString(),
> 50. approxOutputSize,
> conf);
> 51.
> 52. Writer<K, V> writer =
> 53. new Writer<K, V>(conf, fs, outputFile, keyClass,
> valueClass, codec,
> 54. writesCounter);
> 55. *writeFile(this, writer, reporter, conf);*
> 56. writer.close();
> 57.
> 58. //we finished one single level merge; now clean up the
> priority
> 59. //queue
> 60. this.close();
> 61.
> 62. // Add the newly create segment to the list of segments to
> be merged
> 63. Segment<K, V> tempSegment =
> 64. new Segment<K, V>(conf, fs, outputFile, codec, false);
> 65. segments.add(tempSegment);
> 66. numSegments = segments.size();
> 67. Collections.sort(segments, segmentComparator);
> 68.
> 69. passNo++;
> 70. }
> 71. //we are worried about only the first pass merge factor. So
> reset the
> 72. //factor to what it originally was
> 73. factor = origFactor;
> 74. } while(true);
> 75. }
>
> I can see if number of segments is less than factor, segments are
> returned(is this right?). Otherwise, factor number of segments will be
> merged pass by pass. But how those <K,V> in different segments get sort in
> order ?
>
> Elton
>