You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/04 06:39:18 UTC

svn commit: r399543 - in /lucene/hadoop/trunk: ./ src/test/org/apache/hadoop/fs/ src/test/org/apache/hadoop/test/

Author: cutting
Date: Wed May  3 21:39:16 2006
New Revision: 399543

URL: http://svn.apache.org/viewcvs?rev=399543&view=rev
Log:
HADOOP-193 & HADOOP-194.  A filesystem benchmark and a filesystem checker.  Contributed by Konstantin.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399543&r1=399542&r2=399543&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed May  3 21:39:16 2006
@@ -177,6 +177,13 @@
     WritableComparable, but is not yet otherwise used by
     Hadoop. (Milind Bhandarkar via cutting)
 
+46. HADOOP-193.  Add a MapReduce-based FileSystem benchmark.
+    (Konstantin Shvachko via cutting)
+
+47. HADOOP-194.  Add a MapReduce-based FileSystem checker.  This reads
+    every block in every file in the filesystem.  (Konstantin Shvachko
+    via cutting)
+
 
 Release 0.1.1 - 2006-04-08
 

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Wed May  3 21:39:16 2006
@@ -0,0 +1,81 @@
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * Reducer that accumulates values based on their type.
+ * <p>
+ * The type is specified in the key part of the key-value pair 
+ * as a prefix to the key in the following way
+ * <p>
+ * <tt>type:key</tt>
+ * <p>
+ * The values are accumulated according to the types:
+ * <ul>
+ * <li><tt>s:</tt> - string, concatenate</li>
+ * <li><tt>f:</tt> - float, summ</li>
+ * <li><tt>l:</tt> - long, summ</li>
+ * </ul>
+ * 
+ * @author Konstantin Shvachko
+ */
+public class AccumulatingReducer extends MapReduceBase implements Reducer {
+  protected String hostName;
+  
+  public AccumulatingReducer () {
+    TaskTracker.LOG.info("Starting AccumulatingReducer !!!");
+    try {
+      hostName = java.net.InetAddress.getLocalHost().getHostName();
+    } catch(Exception e) {
+      hostName = "localhost";
+    }
+    TaskTracker.LOG.info("Starting AccumulatingReducer on " + hostName);
+  }
+  
+  public void reduce( WritableComparable key, 
+                      Iterator values,
+                      OutputCollector output, 
+                      Reporter reporter
+                      ) throws IOException {
+    String field = ((UTF8) key).toString();
+
+    reporter.setStatus("starting " + field + " ::host = " + hostName);
+
+    // concatenate strings
+    if (field.startsWith("s:")) {
+      String sSum = "";
+      while (values.hasNext())
+        sSum += ((UTF8) values.next()).toString() + ";";
+      output.collect(key, new UTF8(sSum));
+      reporter.setStatus("finished " + field + " ::host = " + hostName);
+      return;
+    }
+    // sum long values
+    if (field.startsWith("f:")) {
+      float fSum = 0;
+      while (values.hasNext())
+        fSum += Float.parseFloat(((UTF8) values.next()).toString());
+      output.collect(key, new UTF8(String.valueOf(fSum)));
+      reporter.setStatus("finished " + field + " ::host = " + hostName);
+      return;
+    }
+    // sum long values
+    if (field.startsWith("l:")) {
+      long lSum = 0;
+      while (values.hasNext()) {
+        lSum += Long.parseLong(((UTF8) values.next()).toString());
+      }
+      output.collect(key, new UTF8(String.valueOf(lSum)));
+    }
+    reporter.setStatus("finished " + field + " ::host = " + hostName);
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Wed May  3 21:39:16 2006
@@ -0,0 +1,324 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+
+import junit.framework.TestCase;
+import java.util.logging.*;
+import java.util.Date;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+/**
+ * Distributed checkup of the file system consistency.
+ * <p>
+ * Test file system consistency by reading each block of each file
+ * of the specified file tree. 
+ * Report corrupted blocks and general file statistics.
+ * <p>
+ * Optionally displays statistics on read performance.
+ * 
+ * @author Konstantin Shvachko
+ */
+public class DistributedFSCheck extends TestCase {
+  // Constants
+  private static final int TEST_TYPE_READ = 0;
+  private static final int TEST_TYPE_CLEANUP = 2;
+  private static final int DEFAULT_BUFFER_SIZE = 1000000;
+  private static final String DEFAULT_RES_FILE_NAME = "DistributedFSCheck_results.log";
+  private static final long MEGA = 0x100000;
+  
+  private static Configuration fsConfig = new Configuration();
+  private static final Logger LOG = InputFormatBase.LOG;
+  private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
+  private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
+  private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
+
+  private FileSystem fs;
+  private long nrFiles;
+  
+  DistributedFSCheck(Configuration conf) throws Exception {
+    fsConfig = conf;
+    this.fs = FileSystem.get( conf );
+  }
+
+  /**
+   * Run distributed checkup for the entire files system.
+   * 
+   * @throws Exception
+   */
+  public void testFSBlocks() throws Exception {
+    testFSBlocks("/");
+  }
+
+  /**
+   * Run distributed checkup for the specified directory.
+   * 
+   * @param rootName root directory name
+   * @throws Exception
+   */
+  public void testFSBlocks( String rootName ) throws Exception {
+    createInputFile(rootName);
+    runDistributedFSCheck();
+  }
+
+  private void createInputFile( String rootName ) throws IOException {
+    fs.delete(MAP_INPUT_DIR);
+
+    Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
+    SequenceFile.Writer writer =
+      new SequenceFile.Writer(fs, inputFile, UTF8.class, LongWritable.class);
+    
+    try {
+      nrFiles = 0;
+      listSubtree( new Path( rootName ), writer );
+    } finally {
+      writer.close();
+    }
+    LOG.info( "Created map input files." );
+  }
+  
+  private void listSubtree( Path rootFile,
+                            SequenceFile.Writer writer
+                            ) throws IOException {
+    if( ! fs.isDirectory(rootFile) ) {
+      nrFiles++;
+      // For a regular file generate <fName,offset> pairs
+      long blockSize = fs.getBlockSize();
+      long fileLength = fs.getLength( rootFile );
+      for( long offset = 0; offset < fileLength; offset += blockSize )
+        writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
+      return;
+    }
+    
+    Path children[] = fs.listPaths( rootFile );
+    if( children == null )
+      throw new IOException("Could not get listing for " + rootFile);
+    for (int i = 0; i < children.length; i++)
+      listSubtree( children[i], writer );
+  }
+
+  /**
+   * DistributedFSCheck mapper class.
+   */
+  public static class DistributedFSCheckMapper extends IOMapperBase {
+
+    public DistributedFSCheckMapper() { 
+      super(fsConfig); 
+    }
+
+    public Object doIO( Reporter reporter, 
+                        String name, 
+                        long offset 
+                        ) throws IOException {
+      // open file
+      DataInputStream in;
+      in = new DataInputStream(fs.open(new Path(name)));
+      long actualSize = 0;
+      try {
+        long blockSize = fs.getBlockSize();
+        int curSize = bufferSize;
+        for(  actualSize = 0; 
+              curSize == bufferSize && actualSize < blockSize;
+              actualSize += curSize) {
+          curSize = in.read( buffer, (int)offset, Math.min(bufferSize, (int)(blockSize - actualSize)) );
+          reporter.setStatus( "reading " + name + "@" + 
+                              offset + "/" + blockSize );
+        }
+      } catch( IOException e ) {
+        LOG.info( "Corrupted block detected in \"" + name + "\" at " + offset );
+        return name + "@" + offset;
+      } finally {
+        in.close();
+      }
+      return new Long( actualSize );
+    }
+    
+    void collectStats(OutputCollector output, 
+                      String name, 
+                      long execTime, 
+                      Object corruptedBlock ) throws IOException {
+      output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
+
+      if( corruptedBlock.getClass().getName().endsWith("String") ) {
+        output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
+        return;
+      }
+      long totalSize = ((Long)corruptedBlock).longValue();
+      float ioRateMbSec = (float)totalSize * 1000 / (execTime * 0x100000);
+      LOG.info( "Number of bytes processed = " + totalSize );
+      LOG.info( "Exec time = " + execTime );
+      LOG.info( "IO rate = " + ioRateMbSec );
+      
+      output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
+      output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
+      output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+    }
+  }
+  
+  private void runDistributedFSCheck() throws Exception {
+    fs.delete(READ_DIR);
+    JobConf job = new JobConf( fs.getConf(), DistributedFSCheck.class );
+
+    job.setInputPath(MAP_INPUT_DIR);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(LongWritable.class);
+
+    job.setMapperClass(DistributedFSCheckMapper.class);
+    job.setReducerClass(AccumulatingReducer.class);
+
+    job.setOutputPath(READ_DIR);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(UTF8.class);
+    job.setNumReduceTasks(1);
+    JobClient.runJob(job);
+  }
+
+  public static void main(String[] args) throws Exception {
+    int testType = TEST_TYPE_READ;
+    int bufferSize = DEFAULT_BUFFER_SIZE;
+    String resFileName = DEFAULT_RES_FILE_NAME;
+    String rootName = "/";
+    boolean viewStats = false;
+
+    String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
+    
+    if(args.length == 1 && args[0].startsWith("-h")) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+    for(int i = 0; i < args.length; i++) {       // parse command line
+      if(args[i].equals("-root")) {
+        rootName = args[++i];
+      } else if (args[i].startsWith("-clean")) {
+        testType = TEST_TYPE_CLEANUP;
+      } else if(args[i].equals("-bufferSize")) {
+        bufferSize = Integer.parseInt(args[++i]);
+      } else if(args[i].equals("-resFile")) {
+        resFileName = args[++i];
+      } else if(args[i].startsWith("-stat")) {
+        viewStats = true;
+      }
+    }
+
+    LOG.info("root = " + rootName);
+    LOG.info("bufferSize = " + bufferSize);
+  
+    Configuration conf = new Configuration();  
+    conf.setInt("test.io.file.buffer.size", bufferSize);
+    DistributedFSCheck test = new DistributedFSCheck( conf );
+
+    if( testType == TEST_TYPE_CLEANUP ) {
+      test.cleanup();
+      return;
+    }
+    test.createInputFile( rootName );
+    long tStart = System.currentTimeMillis();
+    test.runDistributedFSCheck();
+    long execTime = System.currentTimeMillis() - tStart;
+    
+    test.analyzeResult( execTime, resFileName, viewStats );
+  }
+  
+  private void analyzeResult( long execTime,
+                              String resFileName,
+                              boolean viewStats
+                              ) throws IOException {
+    Path reduceFile= new Path( READ_DIR, "part-00000" );
+    DataInputStream in;
+    in = new DataInputStream(fs.open( reduceFile ));
+  
+    BufferedReader lines;
+    lines = new BufferedReader(new InputStreamReader(in));
+    long blocks = 0;
+    long size = 0;
+    long time = 0;
+    float rate = 0;
+    StringTokenizer  badBlocks = null;
+    long nrBadBlocks = 0;
+    String line;
+    while( (line = lines.readLine()) != null ) {
+      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+      String attr = tokens.nextToken(); 
+      if( attr.endsWith("blocks") )
+        blocks = Long.parseLong( tokens.nextToken() );
+      else if( attr.endsWith("size") )
+        size = Long.parseLong( tokens.nextToken() );
+      else if( attr.endsWith("time") )
+        time = Long.parseLong( tokens.nextToken() );
+      else if( attr.endsWith("rate") )
+        rate = Float.parseFloat( tokens.nextToken() );
+      else if( attr.endsWith("badBlocks") ) {
+        badBlocks = new StringTokenizer(tokens.nextToken(), ";");
+        nrBadBlocks = badBlocks.countTokens();
+      }
+    }
+    
+    Vector resultLines = new Vector();
+    resultLines.add(  "----- DistributedFSCheck ----- : " );
+    resultLines.add(  "               Date & time: " + new Date(System.currentTimeMillis()) );
+    resultLines.add(  "    Total number of blocks: " + blocks );
+    resultLines.add(  "    Total number of  files: " + nrFiles );
+    resultLines.add(  "Number of corrupted blocks: " + nrBadBlocks );
+    
+    int nrBadFilesPos = resultLines.size();
+    TreeSet badFiles = new TreeSet();
+    long nrBadFiles = 0;
+    if( nrBadBlocks > 0 ) {
+      resultLines.add("" );
+      resultLines.add("----- Corrupted Blocks (file@offset) ----- : ");
+      while( badBlocks.hasMoreTokens() ) {
+        String curBlock = badBlocks.nextToken();
+        resultLines.add( curBlock );
+        badFiles.add( curBlock.substring(0, curBlock.indexOf('@')) );
+      }
+      nrBadFiles = badFiles.size();
+    }
+    
+    resultLines.insertElementAt( " Number of corrupted files: " + nrBadFiles, nrBadFilesPos );
+    
+    if( viewStats ) {
+      resultLines.add("" );
+      resultLines.add("-----   Performance  ----- : " );
+      resultLines.add("         Total MBytes read: " + size/MEGA );
+      resultLines.add("         Throughput mb/sec: " + (float)size * 1000.0 / (time * MEGA) );
+      resultLines.add("    Average IO rate mb/sec: " + rate / 1000 / blocks );
+      resultLines.add("        Test exec time sec: " + (float)execTime / 1000 );
+    }
+
+    PrintStream res = new PrintStream( 
+                                      new FileOutputStream( 
+                                                           new File(resFileName), true )); 
+    for( int i = 0; i < resultLines.size(); i++ ) {
+      String cur = (String)resultLines.get(i);
+      LOG.info( cur );
+      res.println( cur );
+    }
+  }
+
+  private void cleanup() throws Exception {
+    LOG.info( "Cleaning up test files" );
+    fs.delete(TEST_ROOT_DIR);
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java Wed May  3 21:39:16 2006
@@ -0,0 +1,113 @@
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Base mapper class for IO operations.
+ * <p>
+ * Two abstract method {@link #doIO(Reporter,String,long)} and 
+ * {@link #collectStats(OutputCollector,String,long,Object)} should be
+ * overloaded in derived classes to define the IO operation and the
+ * statistics data to be collected by subsequent reducers.
+ * 
+ * @author Konstantin Shvachko
+ */
+public abstract class IOMapperBase extends Configured implements Mapper {
+  protected byte[] buffer;
+  protected int bufferSize;
+  protected FileSystem fs;
+  protected String hostName;
+
+  public IOMapperBase(Configuration conf) { 
+    super(conf); 
+    try {
+      fs = FileSystem.get(conf);
+    } catch (Exception e) {
+      throw new RuntimeException( "Cannot create file system.", e );
+    }
+    bufferSize = conf.getInt("test.io.file.buffer.size", 4096);
+    buffer = new byte[bufferSize];
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch(Exception e) {
+      hostName = "localhost";
+    }
+  }
+
+  public void configure(JobConf job) {
+    setConf(job);
+  }
+
+  public void close() throws IOException {
+  }
+  
+  /**
+   * Perform io operation, usually read or write.
+   * 
+   * @param reporter
+   * @param name file name
+   * @param value offset within the file
+   * @return object that is passed as a parameter to 
+   *          {@link #collectStats(OutputCollector,String,long,Object)}
+   * @throws IOException
+   */
+  abstract Object doIO( Reporter reporter, 
+                        String name, 
+                        long value ) throws IOException;
+
+  /**
+   * Collect stat data to be combined by a subsequent reducer.
+   * 
+   * @param output
+   * @param name file name
+   * @param execTime IO execution time
+   * @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
+   * @throws IOException
+   */
+  abstract void collectStats( OutputCollector output, 
+                              String name, 
+                              long execTime, 
+                              Object doIOReturnValue ) throws IOException;
+  
+  /**
+   * Map file name and offset into statistical data.
+   * <p>
+   * The map task is to get the 
+   * <tt>key</tt>, which contains the file name, and the 
+   * <tt>value</tt>, which is the offset within the file.
+   * 
+   * The parameters are passed to the abstract method 
+   * {@link #doIO(Reporter,String,long)}, which performs the io operation, 
+   * usually read or write data, and then 
+   * {@link #collectStats(OutputCollector,String,long,Object)} 
+   * is called to prepare stat data for a subsequent reducer.
+   */
+  public void map(WritableComparable key, 
+                  Writable value,
+                  OutputCollector output, 
+                  Reporter reporter) throws IOException {
+    String name = ((UTF8)key).toString();
+    long longValue = ((LongWritable)value).get();
+    
+    reporter.setStatus("starting " + name + " ::host = " + hostName);
+    
+    long tStart = System.currentTimeMillis();
+    Object statValue = doIO( reporter, name, longValue );
+    long tEnd = System.currentTimeMillis();
+    long execTime = tEnd - tStart;
+    collectStats( output, name, execTime, statValue );
+    
+    reporter.setStatus("finished " + name + " ::host = " + hostName);
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Wed May  3 21:39:16 2006
@@ -0,0 +1,430 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+
+import junit.framework.TestCase;
+import java.util.logging.*;
+import java.util.Date;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+/**
+ * Distributed i/o benchmark.
+ * <p>
+ * This test writes into or reads from a specified number of files.
+ * File size is specified as a parameter to the test. 
+ * Each file is accessed in a separate map task.
+ * <p>
+ * The reducer collects the following statistics:
+ * <ul>
+ * <li>number of tasks completed</li>
+ * <li>number of bytes written/read</li>
+ * <li>execution time</li>
+ * <li>io rate</li>
+ * <li>io rate squared</li>
+ * </ul>
+ *    
+ * Finally, the following information is appended to a local file
+ * <ul>
+ * <li>read or write test</li>
+ * <li>date and time the test finished</li>   
+ * <li>number of files</li>
+ * <li>total number of bytes processed</li>
+ * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
+ * <li>average i/o rate in mb/sec per file</li>
+ * <li>standard i/o rate deviation</li>
+ * </ul>
+ *
+ * @author Konstantin Shvachko
+ */
+public class TestDFSIO extends TestCase {
+  // Constants
+  private static final int TEST_TYPE_READ = 0;
+  private static final int TEST_TYPE_WRITE = 1;
+  private static final int TEST_TYPE_CLEANUP = 2;
+  private static final int DEFAULT_BUFFER_SIZE = 1000000;
+  private static final String BASE_FILE_NAME = "test_io_";
+  private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
+  
+  private static final Logger LOG = InputFormatBase.LOG;
+  private static Configuration fsConfig = new Configuration();
+  private static final long MEGA = 0x100000;
+  private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
+  private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
+  private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
+  private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
+  private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
+
+  /**
+   * Run the test with default parameters.
+   * 
+   * @throws Exception
+   */
+  public void testIOs() throws Exception {
+    testIOs(10, 10);
+  }
+
+  /**
+   * Run the test with the specified parameters.
+   * 
+   * @param fileSize file size
+   * @param nrFiles number of files
+   * @throws IOException
+   */
+  public static void testIOs(int fileSize, int nrFiles)
+    throws IOException {
+
+    FileSystem fs = FileSystem.get(fsConfig);
+
+    createControlFile(fs, fileSize, nrFiles);
+    writeTest(fs);
+    readTest(fs);
+  }
+
+  private static void createControlFile(
+                                        FileSystem fs,
+                                        int fileSize, // in MB 
+                                        int nrFiles
+                                        ) throws IOException {
+    LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
+
+    fs.delete(CONTROL_DIR);
+
+    for( int i=0; i < nrFiles; i++ ) {
+      String name = getFileName(i);
+      Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
+      SequenceFile.Writer writer = null;
+      try {
+        writer = new SequenceFile.Writer(fs, controlFile,
+                                         UTF8.class, LongWritable.class);
+        writer.append(new UTF8(name), new LongWritable(fileSize));
+      } catch(Exception e) {
+        throw new IOException(e.getLocalizedMessage());
+      } finally {
+    	if( writer != null )
+          writer.close();
+    	writer = null;
+      }
+    }
+    LOG.info("created control files for: "+nrFiles+" files");
+  }
+
+  private static String getFileName( int fIdx ) {
+    return BASE_FILE_NAME + Integer.toString(fIdx);
+  }
+  
+  /**
+   * Write/Read mapper base class.
+   * <p>
+   * Collects the following statistics per task:
+   * <ul>
+   * <li>number of tasks completed</li>
+   * <li>number of bytes written/read</li>
+   * <li>execution time</li>
+   * <li>i/o rate</li>
+   * <li>i/o rate squared</li>
+   * </ul>
+   */
+  private abstract static class IOStatMapper extends IOMapperBase {
+    IOStatMapper() { 
+      super(fsConfig);
+    }
+    
+    void collectStats(OutputCollector output, 
+                      String name,
+                      long execTime, 
+                      Object objSize ) throws IOException {
+      long totalSize = ((Long)objSize).longValue();
+      float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
+      LOG.info("Number of bytes processed = " + totalSize );
+      LOG.info("Exec time = " + execTime );
+      LOG.info("IO rate = " + ioRateMbSec );
+      
+      output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
+      output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
+      output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
+      output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+      output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+    }
+  }
+
+  /**
+   * Write mapper class.
+   */
+  public static class WriteMapper extends IOStatMapper {
+
+    public WriteMapper() { 
+      super(); 
+      for( int i=0; i < bufferSize; i++ )
+        buffer[i] = (byte)('0' + i % 50);
+    }
+
+    public Object doIO( Reporter reporter, 
+                        String name, 
+                        long totalSize 
+                        ) throws IOException {
+      // create file
+      totalSize *= MEGA;
+      OutputStream out;
+      out = fs.create(new Path(DATA_DIR, name), true, bufferSize);
+      
+      try {
+        // write to the file
+        long nrRemaining;
+        for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize ) {
+          int curSize = ( bufferSize < nrRemaining ) ? bufferSize : (int)nrRemaining; 
+          out.write( buffer, 0, curSize );
+          reporter.setStatus( "writing " + name + "@" + 
+                              (totalSize - nrRemaining) + "/" + totalSize 
+                              + " ::host = " + hostName);
+        }
+      } finally {
+        out.close();
+      }
+      return new Long(totalSize);
+    }
+  }
+
+  private static void writeTest(FileSystem fs)
+    throws IOException {
+
+    fs.delete(DATA_DIR);
+    fs.delete(WRITE_DIR);
+    
+    runIOTest( WriteMapper.class, WRITE_DIR );
+  }
+  
+  private static void runIOTest(  Class mapperClass, 
+                                  Path outputDir
+                                  ) throws IOException {
+    JobConf job = new JobConf( fsConfig, TestDFSIO.class );
+
+    job.setInputPath(CONTROL_DIR);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(LongWritable.class);
+
+    job.setMapperClass(mapperClass);
+    job.setReducerClass(AccumulatingReducer.class);
+
+    job.setOutputPath(outputDir);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(UTF8.class);
+    job.setNumReduceTasks(1);
+    JobClient.runJob(job);
+  }
+
+  /**
+   * Read mapper class.
+   */
+  public static class ReadMapper extends IOStatMapper {
+
+    public ReadMapper() { 
+      super(); 
+    }
+
+    public Object doIO( Reporter reporter, 
+                        String name, 
+                        long totalSize 
+                        ) throws IOException {
+      totalSize *= MEGA;
+      // open file
+      DataInputStream in;
+      in = new DataInputStream(fs.open(new Path(DATA_DIR, name)));
+      try {
+        long actualSize = 0;
+        for( int curSize = bufferSize; curSize == bufferSize; ) {
+          curSize = in.read( buffer, 0, bufferSize );
+          actualSize += curSize;
+          reporter.setStatus( "reading " + name + "@" + 
+                              actualSize + "/" + totalSize 
+                              + " ::host = " + hostName);
+        }
+      } finally {
+        in.close();
+      }
+      return new Long(totalSize);
+    }
+  }
+
+  private static void readTest(FileSystem fs) throws IOException {
+    fs.delete(READ_DIR);
+    runIOTest( ReadMapper.class, READ_DIR );
+  }
+
+  private static void sequentialTest(
+                                     FileSystem fs, 
+                                     int testType, 
+                                     int fileSize, 
+                                     int nrFiles
+                                     ) throws Exception {
+    IOStatMapper ioer = null;
+    if( testType == TEST_TYPE_READ )
+      ioer = new ReadMapper();
+    else if( testType == TEST_TYPE_WRITE )
+      ioer = new WriteMapper();
+    else
+      return;
+    for( int i=0; i < nrFiles; i++)
+      ioer.doIO(new Reporter() {
+          public void setStatus(String status) throws IOException {}
+        },
+                BASE_FILE_NAME+Integer.toString(i), 
+                MEGA*fileSize );
+  }
+
+  public static void main(String[] args) {
+    int testType = TEST_TYPE_READ;
+    int bufferSize = DEFAULT_BUFFER_SIZE;
+    int fileSize = 1;
+    int nrFiles = 1;
+    String resFileName = DEFAULT_RES_FILE_NAME;
+    boolean isSequential = false;
+
+    String version="TestFDSIO.0.0.4";
+    String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
+    
+    System.out.println(version);
+    if (args.length == 0) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+    for (int i = 0; i < args.length; i++) {       // parse command line
+      if (args[i].startsWith("-r")) {
+        testType = TEST_TYPE_READ;
+      } else if (args[i].startsWith("-w")) {
+        testType = TEST_TYPE_WRITE;
+      } else if (args[i].startsWith("-clean")) {
+        testType = TEST_TYPE_CLEANUP;
+      } else if (args[i].startsWith("-seq")) {
+        isSequential = true;
+      } else if (args[i].equals("-nrFiles")) {
+        nrFiles = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-fileSize")) {
+        fileSize = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-bufferSize")) {
+        bufferSize = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-resFile")) {
+        resFileName = args[++i];
+      }
+    }
+
+    LOG.info("nrFiles = " + nrFiles);
+    LOG.info("fileSize (MB) = " + fileSize);
+    LOG.info("bufferSize = " + bufferSize);
+  
+    try {
+      fsConfig.setInt("test.io.file.buffer.size", bufferSize);
+      FileSystem fs = FileSystem.get(fsConfig);
+
+      if( isSequential ) {
+        long tStart = System.currentTimeMillis();
+        sequentialTest( fs, testType, fileSize, nrFiles );
+        long execTime = System.currentTimeMillis() - tStart;
+        String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
+        LOG.info( resultLine );
+        return;
+      }
+      if( testType == TEST_TYPE_CLEANUP ) {
+        cleanup( fs );
+        return;
+      }
+      createControlFile(fs, fileSize, nrFiles);
+      long tStart = System.currentTimeMillis();
+      if( testType == TEST_TYPE_WRITE )
+        writeTest(fs);
+      if( testType == TEST_TYPE_READ )
+        readTest(fs);
+      long execTime = System.currentTimeMillis() - tStart;
+    
+      analyzeResult( fs, testType, execTime, resFileName );
+    } catch( Exception e ) {
+      System.err.print( e.getLocalizedMessage());
+      System.exit(-1);
+    }
+  }
+  
+  private static void analyzeResult(  FileSystem fs, 
+                                      int testType,
+                                      long execTime,
+                                      String resFileName
+                                      ) throws IOException {
+    Path reduceFile;
+    if( testType == TEST_TYPE_WRITE )
+      reduceFile = new Path( WRITE_DIR, "part-00000" );
+    else
+      reduceFile = new Path( READ_DIR, "part-00000" );
+    DataInputStream in;
+    in = new DataInputStream(fs.open( reduceFile ));
+  
+    BufferedReader lines;
+    lines = new BufferedReader(new InputStreamReader(in));
+    long tasks = 0;
+    long size = 0;
+    long time = 0;
+    float rate = 0;
+    float sqrate = 0;
+    String line;
+    while( (line = lines.readLine()) != null ) {
+      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+      String attr = tokens.nextToken(); 
+      if( attr.endsWith(":tasks") )
+        tasks = Long.parseLong( tokens.nextToken() );
+      else if( attr.endsWith(":size") )
+        size = Long.parseLong( tokens.nextToken() );
+      else if( attr.endsWith(":time") )
+        time = Long.parseLong( tokens.nextToken() );
+      else if( attr.endsWith(":rate") )
+        rate = Float.parseFloat( tokens.nextToken() );
+      else if( attr.endsWith(":sqrate") )
+        sqrate = Float.parseFloat( tokens.nextToken() );
+    }
+    
+    double med = rate / 1000 / tasks;
+    double stdDev = Math.sqrt( Math.abs(sqrate / 1000 / tasks - med*med ));
+    String resultLines[] = {
+      "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
+                                    (testType == TEST_TYPE_READ) ? "read" : 
+                                    "unknown"),
+      "           Date & time: " + new Date(System.currentTimeMillis()),
+      "       Number of files: " + tasks,
+      "Total MBytes processed: " + size/MEGA,
+      "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
+      "Average IO rate mb/sec: " + med,
+      " Std IO rate deviation: " + stdDev,
+      "    Test exec time sec: " + (float)execTime / 1000,
+      "" };
+
+    PrintStream res = new PrintStream( 
+                                      new FileOutputStream( 
+                                                           new File(resFileName), true )); 
+    for( int i = 0; i < resultLines.length; i++ ) {
+      LOG.info( resultLines[i] );
+      res.println( resultLines[i] );
+    }
+  }
+
+  private static void cleanup( FileSystem fs ) throws Exception {
+    LOG.info( "Cleaning up test files" );
+    fs.delete(new Path(TEST_ROOT_DIR));
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=399543&r1=399542&r2=399543&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Wed May  3 21:39:16 2006
@@ -17,12 +17,6 @@
 package org.apache.hadoop.test;
 
 import org.apache.hadoop.util.ProgramDriver;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import org.apache.hadoop.mapred.TestMapRed;
 import org.apache.hadoop.mapred.TestTextInputFormat;
 import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
@@ -33,6 +27,7 @@
 import org.apache.hadoop.io.TestSequenceFile;
 import org.apache.hadoop.ipc.TestIPC;
 import org.apache.hadoop.ipc.TestRPC;
+import org.apache.hadoop.fs.TestDFSIO;
 
 public class AllTestDriver {
   
@@ -54,6 +49,7 @@
 	    pgd.addClass("testipc", TestIPC.class, "A test for ipc.");
 	    pgd.addClass("testsequencefileinputformat", TestSequenceFileInputFormat.class, "A test for sequence file input format.");
 	    pgd.addClass("testtextinputformat", TestTextInputFormat.class, "A test for text input format.");
+      pgd.addClass("TestDFSIO", TestDFSIO.class, "Distributed i/o benchmark.");
 	    pgd.driver(argv);
 	}
 	catch(Throwable e){