You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/12/10 23:43:45 UTC

svn commit: r603084 - in /lucene/hadoop/trunk: CHANGES.txt src/test/org/apache/hadoop/dfs/NNBench.java

Author: dhruba
Date: Mon Dec 10 14:43:44 2007
New Revision: 603084

URL: http://svn.apache.org/viewvc?rev=603084&view=rev
Log:
HADOOP-2000.  Rewrite NNBench to measure namenode performance accurately.
It now uses the map-reduce framework for load generation.
(Mukund Madhugiri via dhruba)


Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=603084&r1=603083&r2=603084&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Dec 10 14:43:44 2007
@@ -109,6 +109,10 @@
     HADOOP-1327.  Include website documentation for streaming. (Rob Weltman
     via omalley)
 
+    HADOOP-2000.  Rewrite NNBench to measure namenode performance accurately.
+    It now uses the map-reduce framework for load generation.
+    (Mukund Madhugiri via dhruba)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java?rev=603084&r1=603083&r2=603084&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java Mon Dec 10 14:43:44 2007
@@ -20,322 +20,938 @@
 
 import java.io.IOException;
 import java.util.Date;
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.File;
+import java.io.BufferedReader;
+import java.util.StringTokenizer;
+import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
 
-import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.TaskTracker;
 
 /**
  * This program executes a specified operation that applies load to 
- * the NameNode. Possible operations include create/writing files,
- * opening/reading files, renaming files, and deleting files.
+ * the NameNode.
  * 
  * When run simultaneously on multiple nodes, this program functions 
  * as a stress-test and benchmark for namenode, especially when 
  * the number of bytes written to each file is small.
+ * 
+ * Valid operations are:
+ *   create_write
+ *   open_read
+ *   rename
+ *   delete
+ * 
+ * NOTE: The open_read, rename and delete operations assume that the files
+ *       they operate on are already available. The create_write operation 
+ *       must be run before running the other operations.
  */
+
 public class NNBench {
+  protected static final Log LOG = LogFactory.getLog(
+          "org.apache.hadoop.dfs.NNBench");
+  
+  protected static String CONTROL_DIR_NAME = "control";
+  protected static String OUTPUT_DIR_NAME = "output";
+  protected static String DATA_DIR_NAME = "data";
+  protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log";
+  protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4";
+  
+  public static String operation = "none";
+  public static long numberOfMaps = 1l; // default is 1
+  public static long numberOfReduces = 1l; // default is 1
+  public static long startTime = 
+          System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min
+  public static long blockSize = 1l; // default is 1
+  public static int bytesToWrite = 0; // default is 0
+  public static long bytesPerChecksum = 1l; // default is 1
+  public static long numberOfFiles = 1l; // default is 1
+  public static short replicationFactorPerFile = 1; // default is 1
+  public static String baseDir = "/benchmarks/NNBench";  // default
+  public static boolean readFileAfterOpen = false; // default is to not read
+  
+  // Supported operations
+  private static final String OP_CREATE_WRITE = "create_write";
+  private static final String OP_OPEN_READ = "open_read";
+  private static final String OP_RENAME = "rename";
+  private static final String OP_DELETE = "delete";
+  
+  // To display in the format that matches the NN and DN log format
+  // Example: 2007-10-26 00:01:19,853
+  static SimpleDateFormat sdf = 
+          new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S");
+
+  private static Configuration config = new Configuration();
   
-  private static final Log LOG = LogFactory.getLog(
-                                            "org.apache.hadoop.dfs.NNBench");
+  /**
+   * Clean up the files before a test run
+   * 
+   * @throws IOException on error
+   */
+  private static void cleanupBeforeTestrun() throws IOException {
+    FileSystem tempFS = FileSystem.get(config);
+    
+    // Delete the data directory only if it is the create/write operation
+    if (operation.equals(OP_CREATE_WRITE)) {
+      LOG.info("Deleting data directory");
+      tempFS.delete(new Path(baseDir, DATA_DIR_NAME));
+    }
+    tempFS.delete(new Path(baseDir, CONTROL_DIR_NAME));
+    tempFS.delete(new Path(baseDir, OUTPUT_DIR_NAME));
+  }
   
-  // variable initialzed from command line arguments
-  private static long startTime = 0;
-  private static int numFiles = 0;
-  private static long bytesPerBlock = 1;
-  private static long blocksPerFile = 0;
-  private static long bytesPerFile = 1;
-  private static Path baseDir = null;
-    
-  // variables initialized in main()
-  private static FileSystem fileSys = null;
-  private static Path taskDir = null;
-  private static String uniqueId = null;
-  private static byte[] buffer;
-  private static long maxExceptionsPerFile = 200;
-    
-  /**
-   * Returns when the current number of seconds from the epoch equals
-   * the command line argument given by <code>-startTime</code>.
-   * This allows multiple instances of this program, running on clock
-   * synchronized nodes, to start at roughly the same time.
-   */
-  static void barrier() {
-    long sleepTime;
-    while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
+  /**
+   * Create control files before a test run.
+   * Number of files created is equal to the number of maps specified
+   * 
+   * @throws IOException on error
+   */
+  private static void createControlFiles() throws IOException {
+    FileSystem tempFS = FileSystem.get(config);
+    LOG.info("Creating " + numberOfMaps + " control files");
+
+    for (int i = 0; i < numberOfMaps; i++) {
+      String strFileName = "NNBench_Controlfile_" + i;
+      Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
+              strFileName);
+
+      SequenceFile.Writer writer = null;
       try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException ex) {
+        writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, 
+                LongWritable.class, CompressionType.NONE);
+        writer.append(new Text(strFileName), new LongWritable(0l));
+      } catch(Exception e) {
+        throw new IOException(e.getLocalizedMessage());
+      } finally {
+        if (writer != null) {
+          writer.close();
+        }
+        writer = null;
       }
     }
   }
+  /**
+   * Display version
+   */
+  private static void displayVersion() {
+    System.out.println(NNBENCH_VERSION);
+  }
+  
+  /**
+   * Display usage
+   */
+  private static void displayUsage() {
+    String usage =
+      "Usage: nnbench <options>\n" +
+      "Options:\n" +
+      "\t-operation <Available operations are " + OP_CREATE_WRITE + " " +
+      OP_OPEN_READ + " " + OP_RENAME + " " + OP_DELETE + ". " +
+      "This option is mandatory>\n" +
+      "\t * NOTE: The open_read, rename and delete operations assume " +
+      "that the files they operate on, are already available. " +
+      "The create_write operation must be run before running the " +
+      "other operations.\n" +
+      "\t-maps <number of maps. default is 1. This is not mandatory>\n" +
+      "\t-reduces <number of reduces. default is 1. This is not mandatory>\n" +
+      "\t-startTime <time to start, given in seconds from the epoch. " +
+      "Make sure this is far enough into the future, so all maps " +
+      "(operations) will start at the same time>. " +
+      "default is launch time + 2 mins. This is not mandatory \n" +
+      "\t-blockSize <Block size in bytes. default is 1. " + 
+      "This is not mandatory>\n" +
+      "\t-bytesToWrite <Bytes to write. default is 0. " + 
+      "This is not mandatory>\n" +
+      "\t-bytesPerChecksum <Bytes per checksum for the files. default is 1. " + 
+      "This is not mandatory>\n" +
+      "\t-numberOfFiles <number of files to create. default is 1. " +
+      "This is not mandatory>\n" +
+      "\t-replicationFactorPerFile <Replication factor for the files." +
+        " default is 1. This is not mandatory>\n" +
+      "\t-baseDir <base DFS path. default is /becnhmarks/NNBench. " +
+      "This is not mandatory>\n" +
+      "\t-readFileAfterOpen <true or false. if true, it reads the file and " +
+      "reports the average time to read. This is valid with the open_read " +
+      "operation. default is false. This is not mandatory>\n" +
+      "\t-help: Display the help statement\n";
+      
     
-  static private void handleException(String operation, Throwable e, 
-                                      int singleFileExceptions) {
-    LOG.warn("Exception while " + operation + ": " +
-             StringUtils.stringifyException(e));
-    if (singleFileExceptions >= maxExceptionsPerFile) {
-      throw new RuntimeException(singleFileExceptions + 
-        " exceptions for a single file exceeds threshold. Aborting");
+    System.out.println(usage);
+  }
+
+  /**
+   * check for arguments and fail if the values are not specified
+   */
+  public static void checkArgs(final int index, final int length) {
+    if (index == length) {
+      displayUsage();
+      System.exit(-1);
     }
   }
   
   /**
-   * Create and write to a given number of files.  Repeat each remote
-   * operation until is suceeds (does not throw an exception).
-   *
-   * @return the number of exceptions caught
+   * Parse input arguments
+   * 
+   * @params args Command line inputs
    */
-  static int createWrite() {
-    int totalExceptions = 0;
-    FSDataOutputStream out = null;
-    boolean success = false;
-    for (int index = 0; index < numFiles; index++) {
-      int singleFileExceptions = 0;
-      do { // create file until is succeeds or max exceptions reached
-        try {
-          out = fileSys.create(
-                               new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock);
-          success = true;
-        } catch (IOException ioe) { 
-          success=false; 
-          totalExceptions++;
-          handleException("creating file #" + index, ioe, ++singleFileExceptions);
-        }
-      } while (!success);
-      long toBeWritten = bytesPerFile;
-      while (toBeWritten > 0) {
-        int nbytes = (int) Math.min(buffer.length, toBeWritten);
-        toBeWritten -= nbytes;
-        try { // only try once
-          out.write(buffer, 0, nbytes);
-        } catch (IOException ioe) {
-          totalExceptions++;
-          handleException("writing to file #" + index, ioe, ++singleFileExceptions);
-        }
+  public static void parseInputs(final String[] args) {
+    // If there are no command line arguments, exit
+    if (args.length == 0) {
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // Parse command line args
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-operation")) {
+        operation = args[++i];
+      } else if (args[i].equals("-maps")) {
+        checkArgs(i + 1, args.length);
+        numberOfMaps = Long.parseLong(args[++i]);
+      } else if (args[i].equals("-reduces")) {
+        checkArgs(i + 1, args.length);
+        numberOfReduces = Long.parseLong(args[++i]);
+      } else if (args[i].equals("-startTime")) {
+        checkArgs(i + 1, args.length);
+        startTime = Long.parseLong(args[++i]) * 1000;
+      } else if (args[i].equals("-blockSize")) {
+        checkArgs(i + 1, args.length);
+        blockSize = Long.parseLong(args[++i]);
+      } else if (args[i].equals("-bytesToWrite")) {
+        checkArgs(i + 1, args.length);
+        bytesToWrite = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-bytesPerChecksum")) {
+        checkArgs(i + 1, args.length);
+        bytesPerChecksum = Long.parseLong(args[++i]);
+      } else if (args[i].equals("-numberOfFiles")) {
+        checkArgs(i + 1, args.length);
+        numberOfFiles = Long.parseLong(args[++i]);
+      } else if (args[i].equals("-replicationFactorPerFile")) {
+        checkArgs(i + 1, args.length);
+        replicationFactorPerFile = Short.parseShort(args[++i]);
+      } else if (args[i].equals("-baseDir")) {
+        checkArgs(i + 1, args.length);
+        baseDir = args[++i];
+      } else if (args[i].equals("-readFileAfterOpen")) {
+        checkArgs(i + 1, args.length);
+        readFileAfterOpen = Boolean.parseBoolean(args[++i]);
+      } else if (args[i].equals("-help")) {
+        displayUsage();
+        System.exit(-1);
       }
-      do { // close file until is succeeds
-        try {
-          out.close();
-          success = true;
-        } catch (IOException ioe) {
-          success=false; 
-          totalExceptions++;
-          handleException("closing file #" + index, ioe, ++singleFileExceptions);
-        }
-      } while (!success);
     }
-    return totalExceptions;
-  }
     
+    LOG.info("Test Inputs: ");
+    LOG.info("           Test Operation: " + operation);
+    LOG.info("               Start time: " + sdf.format(new Date(startTime)));
+    LOG.info("           Number of maps: " + numberOfMaps);
+    LOG.info("        Number of reduces: " + numberOfReduces);
+    LOG.info("               Block Size: " + blockSize);
+    LOG.info("           Bytes to write: " + bytesToWrite);
+    LOG.info("       Bytes per checksum: " + bytesPerChecksum);
+    LOG.info("          Number of files: " + numberOfFiles);
+    LOG.info("       Replication factor: " + replicationFactorPerFile);
+    LOG.info("                 Base dir: " + baseDir);
+    LOG.info("     Read file after open: " + readFileAfterOpen);
+    
+    // Set user-defined parameters, so the map method can access the values
+    config.set("test.nnbench.operation", operation);
+    config.setLong("test.nnbench.maps", numberOfMaps);
+    config.setLong("test.nnbench.reduces", numberOfReduces);
+    config.setLong("test.nnbench.starttime", startTime);
+    config.setLong("test.nnbench.blocksize", blockSize);
+    config.setInt("test.nnbench.bytestowrite", bytesToWrite);
+    config.setLong("test.nnbench.bytesperchecksum", bytesPerChecksum);
+    config.setLong("test.nnbench.numberoffiles", numberOfFiles);
+    config.setInt("test.nnbench.replicationfactor", 
+            (int) replicationFactorPerFile);
+    config.set("test.nnbench.basedir", baseDir);
+    config.setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen);
+
+    config.set("test.nnbench.datadir.name", DATA_DIR_NAME);
+    config.set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME);
+    config.set("test.nnbench.controldir.name", CONTROL_DIR_NAME);
+  }
+  
   /**
-   * Open and read a given number of files.
-   *
-   * @return the number of exceptions caught
+   * Analyze the results
+   * 
+   * @throws IOException on error
    */
-  static int openRead() {
-    int totalExceptions = 0;
-    FSDataInputStream in = null;
-    for (int index = 0; index < numFiles; index++) {
-      int singleFileExceptions = 0;
-      try {
-        in = fileSys.open(new Path(taskDir, "" + index), 512);
-        long toBeRead = bytesPerFile;
-        while (toBeRead > 0) {
-          int nbytes = (int) Math.min(buffer.length, toBeRead);
-          toBeRead -= nbytes;
-          try { // only try once
-            in.read(buffer, 0, nbytes);
-          } catch (IOException ioe) {
-            totalExceptions++;
-            handleException("reading from file #" + index, ioe, ++singleFileExceptions);
-          }
-        }
-        in.close();
-      } catch (IOException ioe) { 
-        totalExceptions++;
-        handleException("opening file #" + index, ioe, ++singleFileExceptions);
+  private static void analyzeResults() throws IOException {
+    final FileSystem fs = FileSystem.get(config);
+    Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
+            "part-00000");
+
+    DataInputStream in;
+    in = new DataInputStream(fs.open(reduceFile));
+
+    BufferedReader lines;
+    lines = new BufferedReader(new InputStreamReader(in));
+
+    long totalTimeAL1 = 0l;
+    long totalTimeAL2 = 0l;
+    long totalTimeTPmS = 0l;
+    long lateMaps = 0l;
+    long numOfExceptions = 0l;
+    long successfulFileOps = 0l;
+    
+    long mapStartTimeTPmS = 0l;
+    long mapEndTimeTPmS = 0l;
+    
+    String resultTPSLine1 = null;
+    String resultTPSLine2 = null;
+    String resultALLine1 = null;
+    String resultALLine2 = null;
+    
+    String line;
+    while((line = lines.readLine()) != null) {
+      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
+      String attr = tokens.nextToken();
+      if (attr.endsWith(":totalTimeAL1")) {
+        totalTimeAL1 = Long.parseLong(tokens.nextToken());
+      } else if (attr.endsWith(":totalTimeAL2")) {
+        totalTimeAL2 = Long.parseLong(tokens.nextToken());
+      } else if (attr.endsWith(":totalTimeTPmS")) {
+        totalTimeTPmS = Long.parseLong(tokens.nextToken());
+      } else if (attr.endsWith(":latemaps")) {
+        lateMaps = Long.parseLong(tokens.nextToken());
+      } else if (attr.endsWith(":numOfExceptions")) {
+        numOfExceptions = Long.parseLong(tokens.nextToken());
+      } else if (attr.endsWith(":successfulFileOps")) {
+        successfulFileOps = Long.parseLong(tokens.nextToken());
+      } else if (attr.endsWith(":mapStartTimeTPmS")) {
+        mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
+      } else if (attr.endsWith(":mapEndTimeTPmS")) {
+        mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
       }
     }
-    return totalExceptions;
-  }
     
+    // Average latency is the average time to perform 'n' number of
+    // operations, n being the number of files
+    double avgLatency1 = (double) totalTimeAL1 / (double) successfulFileOps;
+    double avgLatency2 = (double) totalTimeAL2 / (double) successfulFileOps;
+    
+    // The time it takes for the longest running map is measured. Using that,
+    // cluster transactions per second is calculated. It includes time to 
+    // retry any of the failed operations
+    double longestMapTimeTPmS = (double) (mapEndTimeTPmS - mapStartTimeTPmS);
+    double totalTimeTPS = (longestMapTimeTPmS == 0) ?
+            (1000 * successfulFileOps) :
+            (double) (1000 * successfulFileOps) / (double) longestMapTimeTPmS;
+            
+    // The time it takes to perform 'n' operations is calculated (in ms),
+    // n being the number of files. Using that time, the average execution 
+    // time is calculated. It includes time to retry any of the
+    // failed operations
+    double AverageExecutionTime = (totalTimeTPmS == 0) ?
+        (double) successfulFileOps : 
+        (double) (totalTimeTPmS / successfulFileOps);
+            
+    if (operation.equals(OP_CREATE_WRITE)) {
+      // For create/write/close, it is treated as two transactions,
+      // since a file create from a client perspective involves create and close
+      resultTPSLine1 = "               TPS: Create/Write/Close: " + 
+        (int) (totalTimeTPS * 2);
+      resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " + 
+        (double) AverageExecutionTime;
+      resultALLine1 = "            Avg Lat (ms): Create/Write: " + avgLatency1;
+      resultALLine2 = "                   Avg Lat (ms): Close: " + avgLatency2;
+    } else if (operation.equals(OP_OPEN_READ)) {
+      resultTPSLine1 = "                        TPS: Open/Read: " + 
+        (int) totalTimeTPS;
+      resultTPSLine2 = "         Avg Exec time (ms): Open/Read: " + 
+        (double) AverageExecutionTime;
+      resultALLine1 = "                    Avg Lat (ms): Open: " + avgLatency1;
+      if (readFileAfterOpen) {
+        resultALLine2 = "                  Avg Lat (ms): Read: " + avgLatency2;
+      }
+    } else if (operation.equals(OP_RENAME)) {
+      resultTPSLine1 = "                           TPS: Rename: " + 
+        (int) totalTimeTPS;
+      resultTPSLine2 = "            Avg Exec time (ms): Rename: " + 
+        (double) AverageExecutionTime;
+      resultALLine1 = "                  Avg Lat (ms): Rename: " + avgLatency1;
+    } else if (operation.equals(OP_DELETE)) {
+      resultTPSLine1 = "                           TPS: Delete: " + 
+        (int) totalTimeTPS;
+      resultTPSLine2 = "            Avg Exec time (ms): Delete: " + 
+        (double) AverageExecutionTime;
+      resultALLine1 = "                  Avg Lat (ms): Delete: " + avgLatency1;
+    }
+    
+    String resultLines[] = {
+    "-------------- NNBench -------------- : ",
+    "                               Version: " + NNBENCH_VERSION,
+    "                           Date & time: " + sdf.format(new Date(
+            System.currentTimeMillis())),
+    "",
+    "                        Test Operation: " + operation,
+    "                            Start time: " + 
+      sdf.format(new Date(startTime)),
+    "                           Maps to run: " + numberOfMaps,
+    "                        Reduces to run: " + numberOfReduces,
+    "                    Block Size (bytes): " + blockSize,
+    "                        Bytes to write: " + bytesToWrite,
+    "                    Bytes per checksum: " + bytesPerChecksum,
+    "                       Number of files: " + numberOfFiles,
+    "                    Replication factor: " + replicationFactorPerFile,
+    "            Successful file operations: " + successfulFileOps,
+    "",
+    "        # maps that missed the barrier: " + lateMaps,
+    "                          # exceptions: " + numOfExceptions,
+    "",
+    resultTPSLine1,
+    resultTPSLine2,
+    resultALLine1,
+    resultALLine2,
+    "",
+    "                 RAW DATA: AL Total #1: " + totalTimeAL1,
+    "                 RAW DATA: AL Total #2: " + totalTimeAL2,
+    "              RAW DATA: TPS Total (ms): " + totalTimeTPmS,
+    "       RAW DATA: Longest Map Time (ms): " + longestMapTimeTPmS,
+    "                   RAW DATA: Late maps: " + lateMaps,
+    "             RAW DATA: # of exceptions: " + numOfExceptions,
+    "" };
+
+    PrintStream res = new PrintStream(new FileOutputStream(
+            new File(DEFAULT_RES_FILE_NAME), true));
+    
+    // Write to a file and also dump to log
+    for(int i = 0; i < resultLines.length; i++) {
+      LOG.info(resultLines[i]);
+      res.println(resultLines[i]);
+    }
+  }
+  
   /**
-   * Rename a given number of files.  Repeat each remote
-   * operation until is suceeds (does not throw an exception).
-   *
-   * @return the number of exceptions caught
+   * Run the test
+   * 
+   * @throws IOException on error
    */
-  static int rename() {
-    int totalExceptions = 0;
-    boolean success = false;
-    for (int index = 0; index < numFiles; index++) {
-      int singleFileExceptions = 0;
-      do { // rename file until is succeeds
-        try {
-          boolean result = fileSys.rename(
-                                          new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
-          success = true;
-        } catch (IOException ioe) { 
-          success=false; 
-          totalExceptions++;
-          handleException("creating file #" + index, ioe, ++singleFileExceptions);
-       }
-      } while (!success);
-    }
-    return totalExceptions;
-  }
+  public static void runTests() throws IOException {
+    config.setLong("io.bytes.per.checksum", bytesPerChecksum);
+    
+    JobConf job = new JobConf(config, NNBench.class);
+
+    job.setJobName("NNBench-" + operation);
+    job.setInputPath(new Path(baseDir, CONTROL_DIR_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+    
+    // Explicitly set number of max map attempts to 1.
+    job.setMaxMapAttempts(1);
     
+    // Explicitly turn off speculative execution
+    job.setSpeculativeExecution(false);
+
+    job.setMapperClass(NNBenchMapper.class);
+    job.setReducerClass(NNBenchReducer.class);
+
+    job.setOutputPath(new Path(baseDir, OUTPUT_DIR_NAME));
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks((int) numberOfReduces);
+    JobClient.runJob(job);
+  }
+  
   /**
-   * Delete a given number of files.  Repeat each remote
-   * operation until is suceeds (does not throw an exception).
-   *
-   * @return the number of exceptions caught
-   */
-  static int delete() {
-    int totalExceptions = 0;
-    boolean success = false;
-    for (int index = 0; index < numFiles; index++) {
-      int singleFileExceptions = 0;
-      do { // delete file until is succeeds
-        try {
-          boolean result = fileSys.delete(new Path(taskDir, "A" + index));
-          success = true;
-        } catch (IOException ioe) { 
-          success=false; 
-          totalExceptions++;
-          handleException("creating file #" + index, ioe, ++singleFileExceptions);
-        }
-      } while (!success);
+   * Validate the inputs
+   */
+  public static void validateInputs() {
+    // If it is not one of the four operations, then fail
+    if (!operation.equals(OP_CREATE_WRITE) &&
+            !operation.equals(OP_OPEN_READ) &&
+            !operation.equals(OP_RENAME) &&
+            !operation.equals(OP_DELETE)) {
+      System.err.println("Error: Unknown operation: " + operation);
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // If number of maps is a negative number, then fail
+    // Hadoop allows the number of maps to be 0
+    if (numberOfMaps < 0) {
+      System.err.println("Error: Number of maps must be a positive number");
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // If number of reduces is a negative number or 0, then fail
+    if (numberOfReduces <= 0) {
+      System.err.println("Error: Number of reduces must be a positive number");
+      displayUsage();
+      System.exit(-1);
+    }
+
+    // If blocksize is a negative number or 0, then fail
+    if (blockSize <= 0) {
+      System.err.println("Error: Block size must be a positive number");
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // If bytes to write is a negative number, then fail
+    if (bytesToWrite < 0) {
+      System.err.println("Error: Bytes to write must be a positive number");
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // If bytes per checksum is a negative number, then fail
+    if (bytesPerChecksum < 0) {
+      System.err.println("Error: Bytes per checksum must be a positive number");
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // If number of files is a negative number, then fail
+    if (numberOfFiles < 0) {
+      System.err.println("Error: Number of files must be a positive number");
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // If replication factor is a negative number, then fail
+    if (replicationFactorPerFile < 0) {
+      System.err.println("Error: Replication factor must be a positive number");
+      displayUsage();
+      System.exit(-1);
+    }
+    
+    // If block size is not a multiple of bytesperchecksum, fail
+    if (blockSize % bytesPerChecksum != 0) {
+      System.err.println("Error: Block Size in bytes must be a multiple of " +
+              "bytes per checksum: ");
+      displayUsage();
+      System.exit(-1);
     }
-    return totalExceptions;
   }
+  /**
+  * Main method for running the NNBench benchmarks
+  *
+  * @throws IOException indicates a problem with test startup
+  */
+  public static void main(String[] args) throws IOException {
+    // Display the application version string
+    displayVersion();
+
+    // Parse the inputs
+    parseInputs(args);
+    
+    // Validate inputs
+    validateInputs();
+    
+    // Clean up files before the test run
+    cleanupBeforeTestrun();
+    
+    // Create control files before test run
+    createControlFiles();
+
+    // Run the tests as a map reduce job
+    runTests();
     
+    // Analyze results
+    analyzeResults();
+  }
+
+  
   /**
-   * This launches a given namenode operation (<code>-operation</code>),
-   * starting at a given time (<code>-startTime</code>).  The files used
-   * by the openRead, rename, and delete operations are the same files
-   * created by the createWrite operation.  Typically, the program
-   * would be run four times, once for each operation in this order:
-   * createWrite, openRead, rename, delete.
-   *
-   * <pre>
-   * Usage: nnbench 
-   *          -operation <one of createWrite, openRead, rename, or delete>
-   *          -baseDir <base output/input DFS path>
-   *          -startTime <time to start, given in seconds from the epoch>
-   *          -numFiles <number of files to create, read, rename, or delete>
-   *          -blocksPerFile <number of blocks to create per file>
-   *         [-bytesPerBlock <number of bytes to write to each block, default is 1>]
-   *         [-bytesPerChecksum <value for io.bytes.per.checksum>]
-   * </pre>
-   *
-   * @throws IOException indicates a problem with test startup
+   * Mapper class
    */
-  public static void main(String[] args) throws IOException {
-    String version = "NameNodeBenchmark.0.3";
-    System.out.println(version);
-    int bytesPerChecksum = -1;
+  static class NNBenchMapper extends Configured 
+          implements Mapper<Text, LongWritable, Text, Text> {
+    FileSystem filesystem = null;
+    private String hostName = null;
+
+    long numberOfFiles = 1l;
+    long blkSize = 1l;
+    short replFactor = 1;
+    int bytesToWrite = 0;
+    String baseDir = null;
+    String dataDirName = null;
+    String op = null;
+    boolean readFile = false;
+    final int MAX_OPERATION_EXCEPTIONS = 1000;
+    
+    // Data to collect from the operation
+    int numOfExceptions = 0;
+    long startTimeAL = 0l;
+    long totalTimeAL1 = 0l;
+    long totalTimeAL2 = 0l;
+    long successfulFileOps = 0l;
+    
+    /**
+     * Constructor
+     */
+    public NNBenchMapper() {
+      super(config);
+      
+      try {
+        filesystem = FileSystem.get(config);
+      } catch(Exception e) {
+        throw new RuntimeException("Cannot get file system.", e);
+      }
+      
+      try {
+        hostName = InetAddress.getLocalHost().getHostName();
+      } catch(Exception e) {
+        throw new RuntimeException("Error getting hostname", e);
+      }
+    }
     
-    String usage =
-      "Usage: nnbench " +
-      "  -operation <one of createWrite, openRead, rename, or delete> " +
-      "  -baseDir <base output/input DFS path> " +
-      "  -startTime <time to start, given in seconds from the epoch> " +
-      "  -numFiles <number of files to create> " +
-      "  -blocksPerFile <number of blocks to create per file> " +
-      "  [-bytesPerBlock <number of bytes to write to each block, default is 1>] " +
-      "  [-bytesPerChecksum <value for io.bytes.per.checksum>]" +
-      "Note: bytesPerBlock MUST be a multiple of bytesPerChecksum";
-    
-    String operation = null;
-    for (int i = 0; i < args.length; i++) { // parse command line
-      if (args[i].equals("-baseDir")) {
-        baseDir = new Path(args[++i]);
-      } else if (args[i].equals("-numFiles")) {
-        numFiles = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-blocksPerFile")) {
-        blocksPerFile = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-bytesPerBlock")) {
-        bytesPerBlock = Long.parseLong(args[++i]);
-      } else if (args[i].equals("-bytesPerChecksum")) {
-        bytesPerChecksum = Integer.parseInt(args[++i]);        
-      } else if (args[i].equals("-startTime")) {
-        startTime = Long.parseLong(args[++i]) * 1000;
-      } else if (args[i].equals("-operation")) {
-        operation = args[++i];
+    /**
+     * Mapper base implementation
+     */
+    public void configure(JobConf conf) {
+      setConf(conf);
+    }
+    
+    /**
+     * Mapper base implementation
+     */
+    public void close() throws IOException {
+    }
+    
+    /**
+    * Returns when the current number of seconds from the epoch equals
+    * the command line argument given by <code>-startTime</code>.
+    * This allows multiple instances of this program, running on clock
+    * synchronized nodes, to start at roughly the same time.
+    */
+    private boolean barrier() {
+      Configuration conf = filesystem.getConf();
+      long startTime = conf.getLong("test.nnbench.starttime", 0l);
+      long currentTime = System.currentTimeMillis();
+      long sleepTime = startTime - currentTime;
+      boolean retVal = false;
+      
+      // If the sleep time is greater than 0, then sleep and return
+      if (sleepTime > 0) {
+        TaskTracker.LOG.info("Waiting in barrier for: " + sleepTime + " ms");
+      
+        try {
+          Thread.sleep(sleepTime);
+          retVal = true;
+        } catch (Exception e) {
+          retVal = false;
+        }
+      }
+      
+      return retVal;
+    }
+    
+    /**
+     * Map method
+     */ 
+    public void map(Text key, 
+            LongWritable value,
+            OutputCollector<Text, Text> output,
+            Reporter reporter) throws IOException {
+      Configuration conf = filesystem.getConf();
+      
+      numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l);
+      blkSize = conf.getLong("test.nnbench.blocksize", 1l);
+      replFactor = (short) (conf.getInt("test.nnbench.replicationfactor", 1));
+      bytesToWrite = conf.getInt("test.nnbench.bytestowrite", 0);
+      baseDir = conf.get("test.nnbench.basedir");
+      dataDirName = conf.get("test.nnbench.datadir.name");
+      op = conf.get("test.nnbench.operation");
+      readFile = conf.getBoolean("test.nnbench.readFileAfterOpen", false);
+      
+      long totalTimeTPmS = 0l;
+      long startTimeTPmS = 0l;
+      long endTimeTPms = 0l;
+      
+      numOfExceptions = 0;
+      startTimeAL = 0l;
+      totalTimeAL1 = 0l;
+      totalTimeAL2 = 0l;
+      successfulFileOps = 0l;
+      
+      if (barrier()) {
+        if (op.equals(OP_CREATE_WRITE)) {
+          startTimeTPmS = System.currentTimeMillis();
+          doCreateWriteOp("file_" + hostName + "_", output, reporter);
+        } else if (op.equals(OP_OPEN_READ)) {
+          startTimeTPmS = System.currentTimeMillis();
+          doOpenReadOp("file_" + hostName + "_", output, reporter);
+        } else if (op.equals(OP_RENAME)) {
+          startTimeTPmS = System.currentTimeMillis();
+          doRenameOp("file_" + hostName + "_", output, reporter);
+        } else if (op.equals(OP_DELETE)) {
+          startTimeTPmS = System.currentTimeMillis();
+          doDeleteOp("file_" + hostName + "_", output, reporter);
+        }
+        
+        endTimeTPms = System.currentTimeMillis();
+        totalTimeTPmS = endTimeTPms - startTimeTPmS;
       } else {
-        System.out.println(usage);
-        System.exit(-1);
+        output.collect(new Text("l:latemaps"), new Text("1"));
       }
+      
+      // collect after the map end time is measured
+      output.collect(new Text("l:totalTimeAL1"), 
+          new Text(String.valueOf(totalTimeAL1)));
+      output.collect(new Text("l:totalTimeAL2"), 
+          new Text(String.valueOf(totalTimeAL2)));
+      output.collect(new Text("l:numOfExceptions"), 
+          new Text(String.valueOf(numOfExceptions)));
+      output.collect(new Text("l:successfulFileOps"), 
+          new Text(String.valueOf(successfulFileOps)));
+      output.collect(new Text("l:totalTimeTPmS"), 
+              new Text(String.valueOf(totalTimeTPmS)));
+      output.collect(new Text("min:mapStartTimeTPmS"), 
+          new Text(String.valueOf(startTimeTPmS)));
+      output.collect(new Text("max:mapEndTimeTPmS"), 
+          new Text(String.valueOf(endTimeTPms)));
     }
-    bytesPerFile = bytesPerBlock * blocksPerFile;
     
-    JobConf jobConf = new JobConf(new Configuration(), NNBench.class);
+    /**
+     * Create and Write operation.
+     */
+    private void doCreateWriteOp(String name,
+            OutputCollector<Text, Text> output,
+            Reporter reporter) {
+      FSDataOutputStream out = null;
+      byte[] buffer = new byte[bytesToWrite];
+      
+      for (long l = 0l; l < numberOfFiles; l++) {
+        Path filePath = new Path(new Path(baseDir, dataDirName), 
+                name + "_" + l);
+
+        boolean successfulOp = false;
+        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+          try {
+            // Set up timer for measuring AL (transaction #1)
+            startTimeAL = System.currentTimeMillis();
+            // Create the file
+            // Use a buffer size of 512
+            out = filesystem.create(filePath, 
+                    true, 
+                    512, 
+                    replFactor, 
+                    blkSize);
+            out.write(buffer);
+            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+
+            // Close the file / file output stream
+            // Set up timers for measuring AL (transaction #2)
+            startTimeAL = System.currentTimeMillis();
+            out.close();
+            
+            totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
+            successfulOp = true;
+            successfulFileOps ++;
+          } catch (IOException e) {
+            TaskTracker.LOG.info("Exception recorded in op: " +
+                    "Create/Write/Close");
+ 
+            numOfExceptions++;
+          }
+        }
+      }
+    }
     
-    if ( bytesPerChecksum < 0 ) { // if it is not set in cmdline
-      bytesPerChecksum = jobConf.getInt("io.bytes.per.checksum", 512);
+    /**
+     * Open operation
+     */
+    private void doOpenReadOp(String name,
+            OutputCollector<Text, Text> output,
+            Reporter reporter) {
+      FSDataInputStream input = null;
+      byte[] buffer = new byte[bytesToWrite];
+      
+      for (long l = 0l; l < numberOfFiles; l++) {
+        Path filePath = new Path(new Path(baseDir, dataDirName), 
+                name + "_" + l);
+
+        boolean successfulOp = false;
+        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+          try {
+            // Set up timer for measuring AL
+            startTimeAL = System.currentTimeMillis();
+            input = filesystem.open(filePath);
+            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+            
+            // If the file needs to be read (specified at command line)
+            if (readFile) {
+              startTimeAL = System.currentTimeMillis();
+              input.readFully(buffer);
+
+              totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
+            }
+            input.close();
+            successfulOp = true;
+            successfulFileOps ++;
+          } catch (IOException e) {
+            TaskTracker.LOG.info("Exception recorded in op: OpenRead " + e);
+            numOfExceptions++;
+          }
+        }
+      }
     }
-    jobConf.set("io.bytes.per.checksum", Integer.toString(bytesPerChecksum));
     
-    System.out.println("Inputs: ");
-    System.out.println("   operation: " + operation);
-    System.out.println("   baseDir: " + baseDir);
-    System.out.println("   startTime: " + startTime);
-    System.out.println("   numFiles: " + numFiles);
-    System.out.println("   blocksPerFile: " + blocksPerFile);
-    System.out.println("   bytesPerBlock: " + bytesPerBlock);
-    System.out.println("   bytesPerChecksum: " + bytesPerChecksum);
-    
-    if (operation == null ||  // verify args
-        baseDir == null ||
-        numFiles < 1 ||
-        blocksPerFile < 1 ||
-        bytesPerBlock < 0 ||
-        bytesPerBlock % bytesPerChecksum != 0)
-      {
-        System.err.println(usage);
-        System.exit(-1);
+    /**
+     * Rename operation
+     */
+    private void doRenameOp(String name,
+            OutputCollector<Text, Text> output,
+            Reporter reporter) {
+      for (long l = 0l; l < numberOfFiles; l++) {
+        Path filePath = new Path(new Path(baseDir, dataDirName), 
+                name + "_" + l);
+        Path filePathR = new Path(new Path(baseDir, dataDirName), 
+                name + "_r_" + l);
+
+        boolean successfulOp = false;
+        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+          try {
+            // Set up timer for measuring AL
+            startTimeAL = System.currentTimeMillis();
+            filesystem.rename(filePath, filePathR);
+            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+            
+            successfulOp = true;
+            successfulFileOps ++;
+          } catch (IOException e) {
+            TaskTracker.LOG.info("Exception recorded in op: Rename");
+
+            numOfExceptions++;
+          }
+        }
       }
+    }
     
-    fileSys = FileSystem.get(jobConf);
-    uniqueId = java.net.InetAddress.getLocalHost().getHostName();
-    taskDir = new Path(baseDir, uniqueId);
-    // initialize buffer used for writing/reading file
-    buffer = new byte[(int) Math.min(bytesPerFile, 32768L)];
-    
-    Date execTime;
-    Date endTime;
-    long duration;
-    int exceptions = 0;
-    barrier(); // wait for coordinated start time
-    execTime = new Date();
-    System.out.println("Job started: " + startTime);
-    if (operation.equals("createWrite")) {
-      if (!fileSys.mkdirs(taskDir)) {
-        throw new IOException("Mkdirs failed to create " + taskDir.toString());
-      }
-      exceptions = createWrite();
-    } else if (operation.equals("openRead")) {
-      exceptions = openRead();
-    } else if (operation.equals("rename")) {
-      exceptions = rename();
-    } else if (operation.equals("delete")) {
-      exceptions = delete();
-    } else {
-      System.err.println(usage);
-      System.exit(-1);
-    }
-    endTime = new Date();
-    System.out.println("Job ended: " + endTime);
-    duration = (endTime.getTime() - execTime.getTime()) /1000;
-    System.out.println("The " + operation + " job took " + duration + " seconds.");
-    System.out.println("The job recorded " + exceptions + " exceptions.");
+    /**
+     * Delete operation
+     */
+    private void doDeleteOp(String name,
+            OutputCollector<Text, Text> output,
+            Reporter reporter) {
+      for (long l = 0l; l < numberOfFiles; l++) {
+        Path filePath = new Path(new Path(baseDir, dataDirName), 
+                name + "_" + l);
+        
+        boolean successfulOp = false;
+        while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+          try {
+            // Set up timer for measuring AL
+            startTimeAL = System.currentTimeMillis();
+            filesystem.delete(filePath);
+            totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+            
+            successfulOp = true;
+            successfulFileOps ++;
+          } catch (IOException e) {
+            TaskTracker.LOG.info("Exception in recorded op: Delete");
+
+            numOfExceptions++;
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Reducer class
+   */
+  static class NNBenchReducer extends MapReduceBase
+      implements Reducer<Text, Text, Text, Text> {
+
+    protected String hostName;
+
+    public NNBenchReducer () {
+      TaskTracker.LOG.info("Starting NNBenchReducer !!!");
+      try {
+        hostName = java.net.InetAddress.getLocalHost().getHostName();
+      } catch(Exception e) {
+        hostName = "localhost";
+      }
+      TaskTracker.LOG.info("Starting NNBenchReducer on " + hostName);
+    }
+
+    /**
+     * Reduce method
+     */
+    public void reduce(Text key, 
+                       Iterator<Text> values,
+                       OutputCollector<Text, Text> output, 
+                       Reporter reporter
+                       ) throws IOException {
+      String field = key.toString();
+      
+      reporter.setStatus("starting " + field + " ::host = " + hostName);
+      
+      // sum long values
+      if (field.startsWith("l:")) {
+        long lSum = 0;
+        while (values.hasNext()) {
+          lSum += Long.parseLong(values.next().toString());
+        }
+        output.collect(key, new Text(String.valueOf(lSum)));
+      }
+      
+      if (field.startsWith("min:")) {
+        long minVal = -1;
+        while (values.hasNext()) {
+          long value = Long.parseLong(values.next().toString());
+          
+          if (minVal == -1) {
+            minVal = value;
+          } else {
+            if (value != 0 && value < minVal) {
+              minVal = value;
+            }
+          }
+        }
+        output.collect(key, new Text(String.valueOf(minVal)));
+      }
+      
+      if (field.startsWith("max:")) {
+        long maxVal = -1;
+        while (values.hasNext()) {
+          long value = Long.parseLong(values.next().toString());
+          
+          if (maxVal == -1) {
+            maxVal = value;
+          } else {
+            if (value > maxVal) {
+              maxVal = value;
+            }
+          }
+        }
+        output.collect(key, new Text(String.valueOf(maxVal)));
+      }
+      
+      reporter.setStatus("finished " + field + " ::host = " + hostName);
+    }
   }
 }