You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2010/06/04 03:45:28 UTC

svn commit: r951233 - in /hadoop/mapreduce/trunk: CHANGES.txt src/test/mapred/org/apache/hadoop/fs/AccumulatingReducer.java src/test/mapred/org/apache/hadoop/fs/TestDFSIO.java

Author: shv
Date: Fri Jun  4 01:45:28 2010
New Revision: 951233

URL: http://svn.apache.org/viewvc?rev=951233&view=rev
Log:
MAPREDUCE-1832. Allow file sizes less than 1MB in DFSIO benchmark. Contributed by Konstantin Shvachko.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/AccumulatingReducer.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/TestDFSIO.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=951233&r1=951232&r2=951233&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jun  4 01:45:28 2010
@@ -1645,3 +1645,5 @@ Release 0.21.0 - Unreleased
     MAPREDUCE-913. TaskRunner crashes with NPE resulting in held up slots,
     UNINITIALIZED tasks and hung TaskTracker. (Amareshwari Sriramadasu and
     Sreekanth Ramakrishnan via vinodkv)
+
+    MAPREDUCE-1832. Allow file sizes less than 1MB in DFSIO benchmark. (shv)
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/AccumulatingReducer.java?rev=951233&r1=951232&r2=951233&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/AccumulatingReducer.java Fri Jun  4 01:45:28 2010
@@ -52,7 +52,6 @@ public class AccumulatingReducer extends
   protected String hostName;
   
   public AccumulatingReducer () {
-    LOG.info("Starting AccumulatingReducer !!!");
     try {
       hostName = java.net.InetAddress.getLocalHost().getHostName();
     } catch(Exception e) {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/TestDFSIO.java?rev=951233&r1=951232&r2=951233&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/TestDFSIO.java Fri Jun  4 01:45:28 2010
@@ -34,6 +34,7 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -41,6 +42,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Distributed i/o benchmark.
@@ -69,7 +72,7 @@ import org.apache.hadoop.util.StringUtil
  * <li>standard deviation of i/o rate </li>
  * </ul>
  */
-public class TestDFSIO extends TestCase {
+public class TestDFSIO extends TestCase implements Tool {
   // Constants
   private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
   private static final int TEST_TYPE_READ = 0;
@@ -79,18 +82,80 @@ public class TestDFSIO extends TestCase 
   private static final int DEFAULT_BUFFER_SIZE = 1000000;
   private static final String BASE_FILE_NAME = "test_io_";
   private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
-  
-  private static final long MEGA = 0x100000;
-  private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
-  private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
-  private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
-  private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
-  private static Path APPEND_DIR = new Path(TEST_ROOT_DIR, "io_append");
-  private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
+  private static final long MEGA = ByteMultiple.MB.value();
+  private static final String USAGE =
+                            "Usage: " + TestDFSIO.class.getSimpleName() +
+                            " [genericOptions]" +
+                            " -read | -write | -append | -clean [-nrFiles N]" +
+                            " [-fileSize Size[B|KB|MB|GB|TB]]" +
+                            " [-resFile resultFileName] [-bufferSize Bytes]" +
+                            " [-rootDir]";
+
+  private Configuration config;
 
   static{
     Configuration.addDefaultResource("hdfs-default.xml");
     Configuration.addDefaultResource("hdfs-site.xml");
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
+
+  static enum ByteMultiple {
+    B(1L),
+    KB(0x400L),
+    MB(0x100000L),
+    GB(0x40000000L),
+    TB(0x10000000000L);
+
+    private long multiplier;
+
+    private ByteMultiple(long mult) {
+      multiplier = mult;
+    }
+
+    long value() {
+      return multiplier;
+    }
+
+    static ByteMultiple parseString(String sMultiple) {
+      if(sMultiple == null || sMultiple.isEmpty()) // MB by default
+        return MB;
+      String sMU = sMultiple.toUpperCase();
+      if(B.name().toUpperCase().endsWith(sMU))
+        return B;
+      if(KB.name().toUpperCase().endsWith(sMU))
+        return KB;
+      if(MB.name().toUpperCase().endsWith(sMU))
+        return MB;
+      if(GB.name().toUpperCase().endsWith(sMU))
+        return GB;
+      if(TB.name().toUpperCase().endsWith(sMU))
+        return TB;
+      throw new IllegalArgumentException("Unsupported ByteMultiple "+sMultiple);
+    }
+  }
+
+  public TestDFSIO() {
+    this.config = new Configuration();
+  }
+
+  private static String getBaseDir(Configuration conf) {
+    return conf.get("test.build.data","/benchmarks/TestDFSIO");
+  }
+  private static Path getControlDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_control");
+  }
+  private static Path getWriteDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_write");
+  }
+  private static Path getReadDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_read");
+  }
+  private static Path getAppendDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_append");
+  }
+  private static Path getDataDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_data");
   }
 
   /**
@@ -99,7 +164,8 @@ public class TestDFSIO extends TestCase 
    * @throws Exception
    */
   public void testIOs() throws Exception {
-    testIOs(1, 4, new Configuration());
+    TestDFSIO bench = new TestDFSIO();
+    bench.testIOs(1, 4);
   }
 
   /**
@@ -109,27 +175,27 @@ public class TestDFSIO extends TestCase 
    * @param nrFiles number of files
    * @throws IOException
    */
-  public static void testIOs(int fileSize, int nrFiles, Configuration fsConfig)
+  public void testIOs(int fileSize, int nrFiles)
     throws IOException {
-    fsConfig.setBoolean("dfs.support.append", true);
+    config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
     MiniDFSCluster cluster = null;
     try {
-      cluster = new MiniDFSCluster(fsConfig, 2, true, null);
+      cluster = new MiniDFSCluster(config, 2, true, null);
       FileSystem fs = cluster.getFileSystem();
-  
-      createControlFile(fs, fileSize, nrFiles, fsConfig);
+
+      createControlFile(fs, fileSize, nrFiles);
       long tStart = System.currentTimeMillis();
-      writeTest(fs, fsConfig);
+      writeTest(fs);
       long execTime = System.currentTimeMillis() - tStart;
       analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
 
       tStart = System.currentTimeMillis();
-      readTest(fs, fsConfig);
+      readTest(fs);
       execTime = System.currentTimeMillis() - tStart;
       analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
 
       tStart = System.currentTimeMillis();
-      appendTest(fs, fsConfig);
+      appendTest(fs);
       execTime = System.currentTimeMillis() - tStart;
       analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
 
@@ -139,21 +205,21 @@ public class TestDFSIO extends TestCase 
     }
   }
 
-  private static void createControlFile(FileSystem fs,
-                                        int fileSize, // in MB 
-                                        int nrFiles,
-                                        Configuration fsConfig
-                                        ) throws IOException {
-    LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
+  private void createControlFile(FileSystem fs,
+                                  long fileSize, // in bytes
+                                  int nrFiles
+                                ) throws IOException {
+    LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files");
 
-    fs.delete(CONTROL_DIR, true);
+    Path controlDir = getControlDir(config);
+    fs.delete(controlDir, true);
 
     for(int i=0; i < nrFiles; i++) {
       String name = getFileName(i);
-      Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
+      Path controlFile = new Path(controlDir, "in_file_" + name);
       SequenceFile.Writer writer = null;
       try {
-        writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
+        writer = SequenceFile.createWriter(fs, config, controlFile,
                                            Text.class, LongWritable.class,
                                            CompressionType.NONE);
         writer.append(new Text(name), new LongWritable(fileSize));
@@ -221,14 +287,14 @@ public class TestDFSIO extends TestCase 
         buffer[i] = (byte)('0' + i % 50);
     }
 
+    @Override
     public Long doIO(Reporter reporter, 
                        String name, 
-                       long totalSize 
-                       ) throws IOException {
+                       long totalSize // in bytes
+                     ) throws IOException {
       // create file
-      totalSize *= MEGA;
       OutputStream out;
-      out = fs.create(new Path(DATA_DIR, name), true, bufferSize);
+      out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
       
       try {
         // write to the file
@@ -247,23 +313,21 @@ public class TestDFSIO extends TestCase 
     }
   }
 
-  private static void writeTest(FileSystem fs, Configuration fsConfig)
-  throws IOException {
-
-    fs.delete(DATA_DIR, true);
-    fs.delete(WRITE_DIR, true);
+  private void writeTest(FileSystem fs) throws IOException {
+    Path writeDir = getWriteDir(config);
+    fs.delete(getDataDir(config), true);
+    fs.delete(writeDir, true);
     
-    runIOTest(WriteMapper.class, WRITE_DIR, fsConfig);
+    runIOTest(WriteMapper.class, writeDir);
   }
   
   @SuppressWarnings("deprecation")
-  private static void runIOTest(
+  private void runIOTest(
           Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 
-          Path outputDir,
-          Configuration fsConfig) throws IOException {
-    JobConf job = new JobConf(fsConfig, TestDFSIO.class);
+          Path outputDir) throws IOException {
+    JobConf job = new JobConf(config, TestDFSIO.class);
 
-    FileInputFormat.setInputPaths(job, CONTROL_DIR);
+    FileInputFormat.setInputPaths(job, getControlDir(config));
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(mapperClass);
@@ -288,12 +352,11 @@ public class TestDFSIO extends TestCase 
 
     public Long doIO(Reporter reporter, 
                        String name, 
-                       long totalSize 
-                       ) throws IOException {
+                       long totalSize // in bytes
+                     ) throws IOException {
       // create file
-      totalSize *= MEGA;
       OutputStream out;
-      out = fs.append(new Path(DATA_DIR, name), bufferSize);
+      out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
       
       try {
         // write to the file
@@ -312,10 +375,10 @@ public class TestDFSIO extends TestCase 
     }
   }
 
-  private static void appendTest(FileSystem fs, Configuration fsConfig)
-  throws IOException {
-    fs.delete(APPEND_DIR, true);
-    runIOTest(AppendMapper.class, APPEND_DIR, fsConfig);
+  private void appendTest(FileSystem fs) throws IOException {
+    Path appendDir = getAppendDir(config);
+    fs.delete(appendDir, true);
+    runIOTest(AppendMapper.class, appendDir);
   }
 
   /**
@@ -328,15 +391,16 @@ public class TestDFSIO extends TestCase 
 
     public Long doIO(Reporter reporter, 
                        String name, 
-                       long totalSize 
-                       ) throws IOException {
-      totalSize *= MEGA;
+                       long totalSize // in bytes
+                     ) throws IOException {
       // open file
-      DataInputStream in = fs.open(new Path(DATA_DIR, name));
+      DataInputStream in = fs.open(new Path(getDataDir(getConf()), name));
+      long actualSize = 0;
       try {
-        long actualSize = 0;
-        for(int curSize = bufferSize; curSize == bufferSize;) {
+        for(int curSize = bufferSize;
+                curSize == bufferSize && actualSize < totalSize;) {
           curSize = in.read(buffer, 0, bufferSize);
+          if(curSize < 0) break;
           actualSize += curSize;
           reporter.setStatus("reading " + name + "@" + 
                              actualSize + "/" + totalSize 
@@ -345,21 +409,21 @@ public class TestDFSIO extends TestCase 
       } finally {
         in.close();
       }
-      return Long.valueOf(totalSize);
+      return Long.valueOf(actualSize);
     }
   }
 
-  private static void readTest(FileSystem fs, Configuration fsConfig)
-  throws IOException {
-    fs.delete(READ_DIR, true);
-    runIOTest(ReadMapper.class, READ_DIR, fsConfig);
+  private void readTest(FileSystem fs) throws IOException {
+    Path readDir = getReadDir(config);
+    fs.delete(readDir, true);
+    runIOTest(ReadMapper.class, readDir);
   }
 
-  private static void sequentialTest(FileSystem fs, 
-                                     int testType, 
-                                     int fileSize, 
-                                     int nrFiles
-                                     ) throws Exception {
+  private void sequentialTest(FileSystem fs, 
+                              int testType, 
+                              long fileSize, // in bytes
+                              int nrFiles
+                             ) throws IOException {
     IOStatMapper<Long> ioer = null;
     if (testType == TEST_TYPE_READ)
       ioer = new ReadMapper();
@@ -372,28 +436,39 @@ public class TestDFSIO extends TestCase 
     for(int i=0; i < nrFiles; i++)
       ioer.doIO(Reporter.NULL,
                 BASE_FILE_NAME+Integer.toString(i), 
-                MEGA*fileSize);
+                fileSize);
   }
 
   public static void main(String[] args) {
+    TestDFSIO bench = new TestDFSIO();
+    int res = -1;
+    try {
+      res = ToolRunner.run(bench, args);
+    } catch(Exception e) {
+      System.err.print(StringUtils.stringifyException(e));
+      res = -2;
+    }
+    if(res == -1)
+      System.err.print(USAGE);
+    System.exit(res);
+  }
+
+  @Override // Tool
+  public int run(String[] args) throws IOException {
     int testType = TEST_TYPE_READ;
     int bufferSize = DEFAULT_BUFFER_SIZE;
-    int fileSize = 1;
+    long fileSize = 1*MEGA;
     int nrFiles = 1;
     String resFileName = DEFAULT_RES_FILE_NAME;
     boolean isSequential = false;
-    
-    String className = TestDFSIO.class.getSimpleName();
-    String version = className + ".0.0.5";
-    String usage = "Usage: " + className +
-            " -read | -write | -append | -clean [-nrFiles N] [-fileSize MB]" +
-            " [-resFile resultFileName] [-bufferSize Bytes]";
+    String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
 
-    System.out.println(version);
+    LOG.info(version);
     if (args.length == 0) {
-      System.err.println(usage);
-      System.exit(-1);
+      System.err.println("Missing arguments.");
+      return -1;
     }
+
     for (int i = 0; i < args.length; i++) {       // parse command line
       if (args[i].startsWith("-read")) {
         testType = TEST_TYPE_READ;
@@ -408,65 +483,92 @@ public class TestDFSIO extends TestCase 
       } else if (args[i].equals("-nrFiles")) {
         nrFiles = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-fileSize")) {
-        fileSize = Integer.parseInt(args[++i]);
+        fileSize = parseSize(args[++i]);
       } else if (args[i].equals("-bufferSize")) {
         bufferSize = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-resFile")) {
         resFileName = args[++i];
+      } else {
+        System.err.println("Illegal argument: " + args[i]);
+        return -1;
       }
     }
 
     LOG.info("nrFiles = " + nrFiles);
-    LOG.info("fileSize (MB) = " + fileSize);
+    LOG.info("fileSize (MB) = " + toMB(fileSize));
     LOG.info("bufferSize = " + bufferSize);
-  
-    try {
-      Configuration fsConfig = new Configuration();
-      fsConfig.setInt("test.io.file.buffer.size", bufferSize);
-      fsConfig.setBoolean("dfs.support.append", true);
-      FileSystem fs = FileSystem.get(fsConfig);
-
-      if (isSequential) {
-        long tStart = System.currentTimeMillis();
-        sequentialTest(fs, testType, fileSize, nrFiles);
-        long execTime = System.currentTimeMillis() - tStart;
-        String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
-        LOG.info(resultLine);
-        return;
-      }
-      if (testType == TEST_TYPE_CLEANUP) {
-        cleanup(fs);
-        return;
-      }
-      createControlFile(fs, fileSize, nrFiles, fsConfig);
+    LOG.info("baseDir = " + getBaseDir(config));
+
+    config.setInt("test.io.file.buffer.size", bufferSize);
+    config.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+    FileSystem fs = FileSystem.get(config);
+
+    if (isSequential) {
       long tStart = System.currentTimeMillis();
-      if (testType == TEST_TYPE_WRITE)
-        writeTest(fs, fsConfig);
-      if (testType == TEST_TYPE_READ)
-        readTest(fs, fsConfig);
-      if (testType == TEST_TYPE_APPEND)
-        appendTest(fs, fsConfig);
+      sequentialTest(fs, testType, fileSize, nrFiles);
       long execTime = System.currentTimeMillis() - tStart;
-    
-      analyzeResult(fs, testType, execTime, resFileName);
-    } catch(Exception e) {
-      System.err.print(StringUtils.stringifyException(e));
-      System.exit(-1);
+      String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
+      LOG.info(resultLine);
+      return 0;
     }
-  }
+    if (testType == TEST_TYPE_CLEANUP) {
+      cleanup(fs);
+      return 0;
+    }
+    createControlFile(fs, fileSize, nrFiles);
+    long tStart = System.currentTimeMillis();
+    if (testType == TEST_TYPE_WRITE)
+      writeTest(fs);
+    if (testType == TEST_TYPE_READ)
+      readTest(fs);
+    if (testType == TEST_TYPE_APPEND)
+      appendTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
   
-  private static void analyzeResult( FileSystem fs, 
-                                     int testType,
-                                     long execTime,
-                                     String resFileName
-                                     ) throws IOException {
+    analyzeResult(fs, testType, execTime, resFileName);
+    return 0;
+  }
+
+  @Override // Configurable
+  public Configuration getConf() {
+    return this.config;
+  }
+
+  @Override // Configurable
+  public void setConf(Configuration conf) {
+    this.config = conf;
+  }
+
+  /**
+   * Returns size in bytes.
+   * 
+   * @param arg = {d}[B|KB|MB|GB|TB]
+   * @return
+   */
+  static long parseSize(String arg) {
+    String[] args = arg.split("\\D", 2);  // get digits
+    assert args.length <= 2;
+    long fileSize = Long.parseLong(args[0]);
+    String bytesMult = arg.substring(args[0].length()); // get byte multiple
+    return fileSize * ByteMultiple.parseString(bytesMult).value();
+  }
+
+  static float toMB(long bytes) {
+    return ((float)bytes)/MEGA;
+  }
+
+  private void analyzeResult(	FileSystem fs,
+                              int testType,
+                              long execTime,
+                              String resFileName
+                            ) throws IOException {
     Path reduceFile;
     if (testType == TEST_TYPE_WRITE)
-      reduceFile = new Path(WRITE_DIR, "part-00000");
+      reduceFile = new Path(getWriteDir(config), "part-00000");
     else if (testType == TEST_TYPE_APPEND)
-      reduceFile = new Path(APPEND_DIR, "part-00000");
+      reduceFile = new Path(getAppendDir(config), "part-00000");
     else // if (testType == TEST_TYPE_READ)
-      reduceFile = new Path(READ_DIR, "part-00000");
+      reduceFile = new Path(getReadDir(config), "part-00000");
     long tasks = 0;
     long size = 0;
     long time = 0;
@@ -506,7 +608,7 @@ public class TestDFSIO extends TestCase 
                                     "unknown"),
       "           Date & time: " + new Date(System.currentTimeMillis()),
       "       Number of files: " + tasks,
-      "Total MBytes processed: " + size/MEGA,
+      "Total MBytes processed: " + toMB(size),
       "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
       "Average IO rate mb/sec: " + med,
       " IO rate std deviation: " + stdDev,
@@ -525,8 +627,9 @@ public class TestDFSIO extends TestCase 
     }
   }
 
-  private static void cleanup(FileSystem fs) throws IOException {
+  private void cleanup(FileSystem fs)
+  throws IOException {
     LOG.info("Cleaning up test files");
-    fs.delete(new Path(TEST_ROOT_DIR), true);
+    fs.delete(new Path(getBaseDir(config)), true);
   }
 }