You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2014/11/06 22:54:53 UTC

hadoop git commit: MAPREDUCE-5958. Wrong reduce task progress if map output is compressed. Contributed by Emilio Coppa and Jason Lowe.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 167057801 -> 8f701ae07


MAPREDUCE-5958. Wrong reduce task progress if map output is compressed. Contributed by Emilio Coppa and Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f701ae0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f701ae0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f701ae0

Branch: refs/heads/trunk
Commit: 8f701ae07a0b1dc70b8e1eb8d4a5c35c0a1e76da
Parents: 1670578
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Nov 6 15:53:40 2014 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Nov 6 15:53:40 2014 -0600

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../java/org/apache/hadoop/mapred/Merger.java   | 12 +--
 .../mapreduce/task/reduce/TestMerger.java       | 80 ++++++++++++++------
 3 files changed, 64 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f701ae0/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index fd42f82..573408e 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -462,6 +462,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5960. JobSubmitter's check whether job.jar is local is incorrect
     with no authority in job jar path. (Gera Shegalov via jlowe)
 
+    MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
+    (Emilio Coppa and jlowe via kihwal)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f701ae0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
index 9285516..b44e742 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
@@ -515,9 +515,9 @@ public class Merger {
     }
 
     private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
-      long startPos = reader.getPosition();
+      long startPos = reader.getReader().bytesRead;
       boolean hasNext = reader.nextRawKey();
-      long endPos = reader.getPosition();
+      long endPos = reader.getReader().bytesRead;
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
       if (hasNext) {
@@ -543,7 +543,7 @@ public class Merger {
         }
       }
       minSegment = top();
-      long startPos = minSegment.getPosition();
+      long startPos = minSegment.getReader().bytesRead;
       key = minSegment.getKey();
       if (!minSegment.inMemory()) {
         //When we load the value from an inmemory segment, we reset
@@ -560,7 +560,7 @@ public class Merger {
       } else {
         minSegment.getValue(value);
       }
-      long endPos = minSegment.getPosition();
+      long endPos = minSegment.getReader().bytesRead;
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);
       return true;
@@ -638,9 +638,9 @@ public class Merger {
             // Initialize the segment at the last possible moment;
             // this helps in ensuring we don't use buffers until we need them
             segment.init(readsCounter);
-            long startPos = segment.getPosition();
+            long startPos = segment.getReader().bytesRead;
             boolean hasNext = segment.nextRawKey();
-            long endPos = segment.getPosition();
+            long endPos = segment.getReader().bytesRead;
             
             if (hasNext) {
               startBytes += endPos - startPos;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f701ae0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
index c5ab420..651dd38 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
@@ -18,13 +18,12 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doAnswer;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -32,9 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -43,14 +41,15 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile;
+import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.Merger;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -58,21 +57,17 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.CryptoUtils;
-import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
 import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
-import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.collect.Lists;
-
 public class TestMerger {
 
   private Configuration conf;
@@ -254,7 +249,7 @@ public class TestMerger {
     testMergeShouldReturnProperProgress(getUncompressedSegments());
   }
 
-  @SuppressWarnings( { "deprecation", "unchecked" })
+  @SuppressWarnings( { "unchecked" })
   public void testMergeShouldReturnProperProgress(
       List<Segment<Text, Text>> segments) throws IOException {
     Path tmpDir = new Path("localpath");
@@ -267,7 +262,38 @@ public class TestMerger {
     RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
         valueClass, segments, 2, tmpDir, comparator, getReporter(),
         readsCounter, writesCounter, mergePhase);
-    Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), 0.0f);
+    final float epsilon = 0.00001f;
+
+    // Reading 6 keys total, 3 each in 2 segments, so each key read moves the
+    // progress forward 1/6th of the way. Initially the first keys from each
+    // segment have been read as part of the merge setup, so progress = 2/6.
+    Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // The first next() returns one of the keys already read during merge setup
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // Subsequent next() calls should read one key and move progress
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon);
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // At this point we've exhausted all of the keys in one segment
+    // so getting the next key will return the already cached key from the
+    // other segment
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // Subsequent next() calls should read one key and move progress
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon);
+    Assert.assertTrue(mergeQueue.next());
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
+
+    // Now there should be no more input
+    Assert.assertFalse(mergeQueue.next());
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon);
   }
 
   private Progressable getReporter() {
@@ -281,7 +307,7 @@ public class TestMerger {
 
   private List<Segment<Text, Text>> getUncompressedSegments() throws IOException {
     List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
-    for (int i = 1; i < 1; i++) {
+    for (int i = 0; i < 2; i++) {
       segments.add(getUncompressedSegment(i));
     }
     return segments;
@@ -289,44 +315,51 @@ public class TestMerger {
 
   private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
     List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
-    for (int i = 1; i < 1; i++) {
+    for (int i = 0; i < 2; i++) {
       segments.add(getCompressedSegment(i));
     }
     return segments;
   }
 
   private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
-    return new Segment<Text, Text>(getReader(i), false);
+    return new Segment<Text, Text>(getReader(i, false), false);
   }
 
   private Segment<Text, Text> getCompressedSegment(int i) throws IOException {
-    return new Segment<Text, Text>(getReader(i), false, 3000l);
+    return new Segment<Text, Text>(getReader(i, true), false, 3000l);
   }
 
   @SuppressWarnings("unchecked")
-  private Reader<Text, Text> getReader(int i) throws IOException {
+  private Reader<Text, Text> getReader(int i, boolean isCompressedInput)
+      throws IOException {
     Reader<Text, Text> readerMock = mock(Reader.class);
+    when(readerMock.getLength()).thenReturn(30l);
     when(readerMock.getPosition()).thenReturn(0l).thenReturn(10l).thenReturn(
         20l);
     when(
         readerMock.nextRawKey(any(DataInputBuffer.class)))
-        .thenAnswer(getKeyAnswer("Segment" + i));
+        .thenAnswer(getKeyAnswer("Segment" + i, isCompressedInput));
     doAnswer(getValueAnswer("Segment" + i)).when(readerMock).nextRawValue(
         any(DataInputBuffer.class));
 
     return readerMock;
   }
 
-  private Answer<?> getKeyAnswer(final String segmentName) {
+  private Answer<?> getKeyAnswer(final String segmentName,
+      final boolean isCompressedInput) {
     return new Answer<Object>() {
       int i = 0;
 
+      @SuppressWarnings("unchecked")
       public Boolean answer(InvocationOnMock invocation) {
-        Object[] args = invocation.getArguments();
-        DataInputBuffer key = (DataInputBuffer) args[0];
-        if (i++ == 2) {
+        if (i++ == 3) {
           return false;
         }
+        Reader<Text,Text> mock = (Reader<Text,Text>) invocation.getMock();
+        int multiplier = isCompressedInput ? 100 : 1;
+        mock.bytesRead += 10 * multiplier;
+        Object[] args = invocation.getArguments();
+        DataInputBuffer key = (DataInputBuffer) args[0];
         key.reset(("Segment Key " + segmentName + i).getBytes(), 20);
         return true;
       }
@@ -340,9 +373,6 @@ public class TestMerger {
       public Void answer(InvocationOnMock invocation) {
         Object[] args = invocation.getArguments();
         DataInputBuffer key = (DataInputBuffer) args[0];
-        if (i++ == 2) {
-          return null;
-        }
         key.reset(("Segment Value " + segmentName + i).getBytes(), 20);
         return null;
       }