You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ra...@apache.org on 2012/03/05 14:44:02 UTC

svn commit: r1297052 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/

Author: ravigummadi
Date: Mon Mar  5 13:44:01 2012
New Revision: 1297052

URL: http://svn.apache.org/viewvc?rev=1297052&view=rev
Log:
MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead counter is wrong when compressed input is used.(ravigummadi)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1297052&r1=1297051&r2=1297052&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Mar  5 13:44:01 2012
@@ -52,8 +52,12 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    MAPREDUCE-2722. [Gridmix] Gridmix simulated job's map's hdfsBytesRead
+                    counter is wrong when compressed input is used.(ravigummadi)
+
     MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and
-                    sortFinished times when needed.
+                    sortFinished times when needed.(ravigummadi)
+
     MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
                      (Jason Lowe via bobby)
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java?rev=1297052&r1=1297051&r2=1297052&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java Mon Mar  5 13:44:01 2012
@@ -571,4 +571,25 @@ class CompressionEmulationUtil {
     }
     setInputCompressionEmulationEnabled(target, needsCompressedInput);
   }
+
+  /**
+   * Get the uncompressed input bytes count from the given possibly compressed
+   * input bytes count.
+   * @param possiblyCompressedInputBytes input bytes count. This is compressed
+   *        input size if compression emulation is on.
+   * @param conf configuration of the Gridmix simulated job
+   * @return uncompressed input bytes count. Compute this in case if compressed
+   *         input was used
+   */
+  static long getUncompressedInputBytes(long possiblyCompressedInputBytes,
+                                        Configuration conf) {
+    long uncompressedInputBytes = possiblyCompressedInputBytes;
+
+    if (CompressionEmulationUtil.isInputCompressionEmulationEnabled(conf)) {
+      float inputCompressionRatio =
+          CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf);
+      uncompressedInputBytes /= inputCompressionRatio;
+    }
+    return uncompressedInputBytes;
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1297052&r1=1297051&r2=1297052&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Mon Mar  5 13:44:01 2012
@@ -627,9 +627,14 @@ class LoadJob extends GridmixJob {
         }
       }
       final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      long possiblyCompressedInputBytes = info.getInputBytes();
+      Configuration conf = job.getConfiguration();
+      long uncompressedInputBytes =
+          CompressionEmulationUtil.getUncompressedInputBytes(
+          possiblyCompressedInputBytes, conf);
       splits.add(
-        new LoadSplit(striper.splitFor(inputDir, info.getInputBytes(), 3), 
-                      maps, i, info.getInputBytes(), info.getInputRecords(),
+        new LoadSplit(striper.splitFor(inputDir, uncompressedInputBytes, 3), 
+                      maps, i, uncompressedInputBytes, info.getInputRecords(),
                       info.getOutputBytes(), info.getOutputRecords(),
                       reduceByteRatio, reduceRecordRatio, specBytes, 
                       specRecords, info.getResourceUsageMetrics(),
@@ -637,4 +642,4 @@ class LoadJob extends GridmixJob {
     }
     pushDescription(id(), splits);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java?rev=1297052&r1=1297051&r2=1297052&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java Mon Mar  5 13:44:01 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.mapred.gridmix
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
-import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,13 +30,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Utils;
@@ -561,4 +558,30 @@ public class TestCompressionEmulationUti
     String readLine = new String(bytes);
     assertEquals("Compression/Decompression error", inputLine, readLine);
   }
+
+  /**
+   * Tests the computation logic of uncompressed input bytes by
+   * {@link LoadJob#getUncompressedInputBytes(long, Configuration)}
+   */
+  @Test
+  public void testComputeUncompressedInputBytes() {
+    long possiblyCompressedInputBytes = 100000;
+    float compressionRatio = 0.45F;
+    Configuration conf = new Configuration();
+    CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf,
+        compressionRatio);
+
+    // By default, input compression emulation is diabled. Verify the
+    // computation of uncompressed input bytes.
+    long result = CompressionEmulationUtil.getUncompressedInputBytes(
+        possiblyCompressedInputBytes, conf);
+    assertEquals(possiblyCompressedInputBytes, result);
+
+    // Enable input compression emulation and verify uncompressed
+    // input bytes computation logic
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    result = CompressionEmulationUtil.getUncompressedInputBytes(
+        possiblyCompressedInputBytes, conf);
+    assertEquals((long)(possiblyCompressedInputBytes/compressionRatio), result);
+  }
 }