You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/07 19:12:20 UTC

svn commit: r483588 - in /lucene/hadoop/trunk: CHANGES.txt src/examples/org/apache/hadoop/examples/NNBench.java

Author: cutting
Date: Thu Dec  7 10:12:19 2006
New Revision: 483588

URL: http://svn.apache.org/viewvc?view=rev&rev=483588
Log:
HADOOP-763.  Change DFS namenode benchmark to not use MapReduce.  Contributed by Nigel.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483588&r1=483587&r2=483588
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Dec  7 10:12:19 2006
@@ -8,6 +8,9 @@
  2. HADOOP-779. Fix contrib/streaming to work correctly with gzipped
     input files.  (Hairong Kuang via cutting)
 
+ 3. HADOOP-763. Change DFS namenode benchmark to not use MapReduce.
+    (Nigel Daley via cutting)
+
 
 Release 0.9.0 - 2006-12-01
 

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java?view=diff&rev=483588&r1=483587&r2=483588
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Thu Dec  7 10:12:19 2006
@@ -20,206 +20,269 @@
 
 import java.io.IOException;
 import java.util.Date;
-import java.util.Iterator;
-import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.util.Progressable;
 
 /**
- * This program uses map/reduce to run a distributed job where there is
- * no interaction between the tasks.  Each task creates a configurable 
- * number of files.  Each file has a configurable number of bytes 
- * written to it, then it is closed, re-opened, and read from, and
- * re-closed.  This program functions as a stress-test and benchmark
- * for namenode, especially when the number of bytes written to
- * each file is small.
+ * This program executes a specified operation that applies load to 
+ * the NameNode. Possible operations include create/writing files,
+ * opening/reading files, renaming files, and deleting files.
  * 
- * @author Milind Bhandarkar
+ * When run simultaneously on multiple nodes, this program functions 
+ * as a stress-test and benchmark for namenode, especially when 
+ * the number of bytes written to each file is small.
+ *
+ * @author Nigel Daley
  */
-public class NNBench extends MapReduceBase implements Reducer {
-  
-  public static class Map extends MapReduceBase implements Mapper {
-    private FileSystem fileSys = null;
-    private int numBytesToWrite;
-    private Random random = new Random();
-    private String taskId = null;
-    private Path topDir = null;
-    
-    private void randomizeBytes(byte[] data, int offset, int length) {
-      for(int i=offset + length - 1; i >= offset; --i) {
-        data[i] = (byte) random.nextInt(256);
+public class NNBench {
+    // variable initialzed from command line arguments
+    private static long startTime = 0;
+    private static int numFiles = 0;
+    private static long bytesPerBlock = 1;
+    private static long blocksPerFile = 0;
+    private static long bytesPerFile = 1;
+    private static Path baseDir = null;
+    
+    // variables initialized in main()
+    private static FileSystem fileSys = null;
+    private static Path taskDir = null;
+    private static String uniqueId = null;
+    private static byte[] buffer;
+    
+    /**
+     * Returns when the current number of seconds from the epoch equals
+     * the command line argument given by <code>-startTime</code>.
+     * This allows multiple instances of this program, running on clock
+     * synchronized nodes, to start at roughly the same time.
+     */
+    static void barrier() {
+      long sleepTime;
+      while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
+        try {
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException ex) {
+        }
       }
     }
     
     /**
-     * Given a number of files to create, create and open those files
-     * for both writing and reading a given number of bytes.
+     * Create and write to a given number of files.  Repeat each remote
+     * operation until is suceeds (does not throw an exception).
+     *
+     * @return the number of exceptions caught
      */
-    public void map(WritableComparable key, 
-                    Writable value,
-                    OutputCollector output, 
-                    Reporter reporter) throws IOException {
-      int nFiles = ((IntWritable) value).get();
-      Path taskDir = new Path(topDir, taskId);
-      if (!fileSys.mkdirs(taskDir)) {
-        throw new IOException("Mkdirs failed to create " + taskDir.toString());
-      }
-      byte[] buffer = new byte[32768];
-      for (int index = 0; index < nFiles; index++) {
-        FSDataOutputStream out = fileSys.create(
-            new Path(taskDir, Integer.toString(index)));
-        int toBeWritten = numBytesToWrite;
+    static int createWrite() {
+      int exceptions = 0;
+      FSOutputStream out = null;
+      boolean success = false;
+      for (int index = 0; index < numFiles; index++) {
+        do { // create file until is succeeds
+          try {
+              out = fileSys.createRaw(
+              new Path(taskDir, "" + index), false, (short)1, bytesPerBlock);
+            success = true;
+          } catch (IOException ioe) { success=false; exceptions++; }
+        } while (!success);
+        long toBeWritten = bytesPerFile;
         while (toBeWritten > 0) {
-          int nbytes = Math.min(buffer.length, toBeWritten);
-          randomizeBytes(buffer, 0, nbytes);
+          int nbytes = (int) Math.min(buffer.length, toBeWritten);
           toBeWritten -= nbytes;
-          out.write(buffer, 0, nbytes);
-          reporter.setStatus("wrote " + (numBytesToWrite-toBeWritten) +
-              " bytes for "+ index +"th file.");
+          try { // only try once
+            out.write(buffer, 0, nbytes);
+          } catch (IOException ioe) {
+            exceptions++;
+          }
         }
-        out.close();
+        do { // close file until is succeeds
+          try {
+            out.close();
+            success = true;
+          } catch (IOException ioe) { success=false; exceptions++; }
+        } while (!success);
       }
-      for (int index = 0; index < nFiles; index++) {
-        FSDataInputStream in = fileSys.open(
-            new Path(taskDir, Integer.toString(index)));
-        int toBeRead = numBytesToWrite;
-        while (toBeRead > 0) {
-          int nbytes = Math.min(buffer.length, toBeRead);
-          toBeRead -= nbytes;
-          in.read(buffer, 0, nbytes);
-          reporter.setStatus("read " + (numBytesToWrite-toBeRead) +
-              " bytes for "+ index +"th file.");
+      return exceptions;
+    }
+    
+    /**
+     * Open and read a given number of files.
+     *
+     * @return the number of exceptions caught
+     */
+    static int openRead() {
+      int exceptions = 0;
+      FSInputStream in = null;
+      for (int index = 0; index < numFiles; index++) {
+        try {
+          in = fileSys.openRaw(new Path(taskDir, "" + index));
+          long toBeRead = bytesPerFile;
+          while (toBeRead > 0) {
+            int nbytes = (int) Math.min(buffer.length, toBeRead);
+            toBeRead -= nbytes;
+            try { // only try once
+              in.read(buffer, 0, nbytes);
+            } catch (IOException ioe) {
+              exceptions++;
+            }
+          }
+          in.close();
+        } catch (IOException ioe) { 
+          exceptions++; 
         }
-        in.close();
       }
-      fileSys.delete(taskDir); // clean up after yourself
-     }
+      return exceptions;
+    }
+    
+    /**
+     * Rename a given number of files.  Repeat each remote
+     * operation until is suceeds (does not throw an exception).
+     *
+     * @return the number of exceptions caught
+     */
+    static int rename() {
+      int exceptions = 0;
+      boolean success = false;
+      for (int index = 0; index < numFiles; index++) {
+        do { // rename file until is succeeds
+          try {
+            boolean result = fileSys.renameRaw(
+              new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
+            success = true;
+          } catch (IOException ioe) { success=false; exceptions++; }
+        } while (!success);
+      }
+      return exceptions;
+    }
     
     /**
-     * Save the values out of the configuaration that we need to write
-     * the data.
+     * Delete a given number of files.  Repeat each remote
+     * operation until is suceeds (does not throw an exception).
+     *
+     * @return the number of exceptions caught
      */
-    public void configure(JobConf job) {
-      try {
-        fileSys = FileSystem.get(job);
-      } catch (IOException e) {
-        throw new RuntimeException("Can't get default file system", e);
-      }
-      numBytesToWrite = job.getInt("test.nnbench.bytes_per_file", 0);
-      topDir = new Path(job.get("test.nnbench.topdir", "/nnbench"));
-      taskId = job.get("mapred.task.id", (new Long(random.nextLong())).toString());
-    }
-    
-  }
-  
-  public void reduce(WritableComparable key, 
-                     Iterator values,
-                     OutputCollector output, 
-                     Reporter reporter) throws IOException {
-    // nothing
-  }
-  
+    static int delete() {
+      int exceptions = 0;
+      boolean success = false;
+      for (int index = 0; index < numFiles; index++) {
+        do { // delete file until is succeeds
+          try {
+            boolean result = fileSys.deleteRaw(new Path(taskDir, "A" + index));
+            success = true;
+          } catch (IOException ioe) { success=false; exceptions++; }
+        } while (!success);
+      }
+      return exceptions;
+    }
+    
   /**
-   * This is the main routine for launching a distributed namenode stress test.
-   * It runs 10 maps/node.  The reduce doesn't do anything.
-   * 
-   * @throws IOException 
+   * This launches a given namenode operation (<code>-operation</code>),
+   * starting at a given time (<code>-startTime</code>).  The files used
+   * by the openRead, rename, and delete operations are the same files
+   * created by the createWrite operation.  Typically, the program
+   * would be run four times, once for each operation in this order:
+   * createWrite, openRead, rename, delete.
+   *
+   * <pre>
+   * Usage: nnbench 
+   *          -operation <one of createWrite, openRead, rename, or delete>
+   *          -baseDir <base output/input DFS path>
+   *          -startTime <time to start, given in seconds from the epoch>
+   *          -numFiles <number of files to create, read, rename, or delete>
+   *          -blocksPerFile <number of blocks to create per file>
+   *         [-bytesPerBlock <number of bytes to write to each block, default is 1>]
+   * </pre>
+   *
+   * @throws IOException indicates a problem with test startup
    */
   public static void main(String[] args) throws IOException {
-    Configuration defaults = new Configuration();
-    if (args.length != 3) {
-      System.out.println("Usage: nnbench <out-dir> <filesPerMap> <bytesPerFile>");
-      return;
-    }
-    Path outDir = new Path(args[0]);
-    int filesPerMap = Integer.parseInt(args[1]);
-    int numBytesPerFile = Integer.parseInt(args[2]);
-    
-    JobConf jobConf = new JobConf(defaults, NNBench.class);
-    jobConf.setJobName("nnbench");
-    jobConf.setInt("test.nnbench.bytes_per_file", numBytesPerFile);
-    jobConf.set("test.nnbench.topdir", args[0]);
-    
-    // turn off speculative execution, because DFS doesn't handle
-    // multiple writers to the same file.
-    jobConf.setSpeculativeExecution(false);
-    jobConf.setInputFormat(SequenceFileInputFormat.class);
-    jobConf.setOutputKeyClass(BytesWritable.class);
-    jobConf.setOutputValueClass(BytesWritable.class);
-    
-    jobConf.setMapperClass(Map.class);        
-    jobConf.setReducerClass(NNBench.class);
-    
-    JobClient client = new JobClient(jobConf);
-    ClusterStatus cluster = client.getClusterStatus();
-    int numMaps = cluster.getTaskTrackers() * 
-         jobConf.getInt("test.nnbench.maps_per_host", 10);
-    jobConf.setNumMapTasks(numMaps);
-    System.out.println("Running " + numMaps + " maps.");
-    jobConf.setNumReduceTasks(1);
-    
-    Path tmpDir = new Path("random-work");
-    Path inDir = new Path(tmpDir, "in");
-    Path fakeOutDir = new Path(tmpDir, "out");
-    FileSystem fileSys = FileSystem.get(jobConf);
-    if (fileSys.exists(outDir)) {
-      System.out.println("Error: Output directory " + outDir + 
-                         " already exists.");
-      return;
-    }
-    fileSys.delete(tmpDir);
-    if (!fileSys.mkdirs(inDir)) {
-      System.out.println("Error: Mkdirs failed to create " + 
-                         inDir.toString());
-      return;
-    }
-
-    for(int i=0; i < numMaps; ++i) {
-      Path file = new Path(inDir, "part"+i);
-      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys,
-                                jobConf, file,
-                                IntWritable.class, IntWritable.class,
-                                CompressionType.NONE,
-                                (Progressable)null);
-      writer.append(new IntWritable(0), new IntWritable(filesPerMap));
-      writer.close();
+    String version = "NameNodeBenchmark.0.3";
+    System.out.println(version);
+    
+    String usage =
+      "Usage: nnbench " +
+      "-operation <one of createWrite, openRead, rename, or delete> " +
+      "-baseDir <base output/input DFS path> " +
+      "-startTime <time to start, given in seconds from the epoch> " +
+      "-numFiles <number of files to create> " +
+      "-blocksPerFile <number of blocks to create per file> " +
+      "[-bytesPerBlock <number of bytes to write to each block, default is 1>]";
+    
+    String operation = null;
+    for (int i = 0; i < args.length; i++) { // parse command line
+      if (args[i].equals("-baseDir")) {
+        baseDir = new Path(args[++i]);
+      } else if (args[i].equals("-numFiles")) {
+        numFiles = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-blocksPerFile")) {
+        blocksPerFile = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-bytesPerBlock")) {
+        bytesPerBlock = Long.parseLong(args[++i]);
+      } else if (args[i].equals("-startTime")) {
+        startTime = Long.parseLong(args[++i]) * 1000;
+      } else if (args[i].equals("-operation")) {
+        operation = args[++i];
+      } else {
+        System.out.println(usage);
+        System.exit(-1);
+      }
     }
-    jobConf.setInputPath(inDir);
-    jobConf.setOutputPath(fakeOutDir);
+    bytesPerFile = bytesPerBlock * blocksPerFile;
     
-    // Uncomment to run locally in a single process
-    //job_conf.set("mapred.job.tracker", "local");
+    System.out.println("Inputs: ");
+    System.out.println("   operation: " + operation);
+    System.out.println("   baseDir: " + baseDir);
+    System.out.println("   startTime: " + startTime);
+    System.out.println("   numFiles: " + numFiles);
+    System.out.println("   blocksPerFile: " + blocksPerFile);
+    System.out.println("   bytesPerBlock: " + bytesPerBlock);
+    
+    if (operation == null ||  // verify args
+      baseDir == null ||
+      numFiles < 1 ||
+      blocksPerFile < 1 ||
+      bytesPerBlock < 0) 
+    {
+      System.err.println(usage);
+      System.exit(-1);
+    }
     
-    Date startTime = new Date();
+    JobConf jobConf = new JobConf(new Configuration(), NNBench.class);
+    fileSys = FileSystem.get(jobConf);
+    uniqueId = java.net.InetAddress.getLocalHost().getHostName();
+    taskDir = new Path(baseDir, uniqueId);
+    // initialize buffer used for writing/reading file
+    buffer = new byte[(int) Math.min(bytesPerFile, 32768L)];
+    
+    Date execTime;
+    Date endTime;
+    long duration;
+    int exceptions = 0;
+    barrier(); // wait for coordinated start time
+    execTime = new Date();
     System.out.println("Job started: " + startTime);
-    try {
-      JobClient.runJob(jobConf);
-      Date endTime = new Date();
-      System.out.println("Job ended: " + endTime);
-      System.out.println("The job took " + 
-         (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");
-    } finally {
-      fileSys.delete(tmpDir);
+    if (operation.equals("createWrite")) {
+      if (!fileSys.mkdirs(taskDir)) {
+        throw new IOException("Mkdirs failed to create " + taskDir.toString());
+      }
+      exceptions = createWrite();
+    } else if (operation.equals("openRead")) {
+      exceptions = openRead();
+    } else if (operation.equals("rename")) {
+      exceptions = rename();
+    } else if (operation.equals("delete")) {
+      exceptions = delete();
+    } else {
+      System.err.println(usage);
+      System.exit(-1);
     }
+    endTime = new Date();
+    System.out.println("Job ended: " + endTime);
+    duration = (endTime.getTime() - execTime.getTime()) /1000;
+    System.out.println("The " + operation + " job took " + duration + " seconds.");
+    System.out.println("The job recorded " + exceptions + " exceptions.");
   }
 }