You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/01/12 19:19:12 UTC

[30/34] hadoop git commit: HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job. Contributed by Akshay Radia

HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job. Contributed by Akshay Radia


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63947cce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63947cce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63947cce

Branch: refs/heads/HDFS-EC
Commit: 63947ccedb9f84fd39822c6152cbeaf4dca37f28
Parents: bc251b7
Author: Brandon Li <br...@apache.org>
Authored: Fri Jan 9 17:24:22 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Jan 12 10:18:02 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../hadoop/fs/loadGenerator/LoadGenerator.java  | 343 ++++++++-----
 .../fs/loadGenerator/LoadGeneratorMR.java       | 483 +++++++++++++++++++
 .../apache/hadoop/test/MapredTestDriver.java    |  12 +
 4 files changed, 719 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 37a451e..90bfe3e 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -469,6 +469,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11464. Reinstate support for launching Hadoop processes on Windows
     using Cygwin. (cnauroth)
 
+    HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job
+    (Akshay Radia via brandonli)
+
   OPTIMIZATIONS
 
     HADOOP-11323. WritableComparator#compare keeps reference to byte array.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
index 994b9b2..ca01702 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.fs.loadGenerator;
 
 import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -36,10 +38,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options.CreateOpts;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -48,8 +51,11 @@ import org.apache.hadoop.util.ToolRunner;
 import com.google.common.base.Preconditions;
 
 /** The load generator is a tool for testing NameNode behavior under
- * different client loads.
- * It allows the user to generate different mixes of read, write,
+ * different client loads. Note there is a subclass of this clas that lets 
+ * you run a the load generator as a MapReduce job (see LoadGeneratorMR in the 
+ * MapReduce project.
+ * 
+ * The loadGenerator allows the user to generate different mixes of read, write,
  * and list requests by specifying the probabilities of read and
  * write. The user controls the intensity of the load by
  * adjusting parameters for the number of worker threads and the delay
@@ -58,15 +64,24 @@ import com.google.common.base.Preconditions;
  * generator exits, it print some NameNode statistics like the average
  * execution time of each kind of operations and the NameNode
  * throughput.
+ *
+ * The program can run in one of two forms. As a regular single process command
+ * that runs multiple threads to generate load on the NN or as a Map Reduce
+ * program that runs multiple (multi-threaded) map tasks that generate load
+ * on the NN; the results summary is generated by a single reduce task.
+ * 
  * 
  * The user may either specify constant duration, read and write 
  * probabilities via the command line, or may specify a text file
  * that acts as a script of which read and write probabilities to
- * use for specified durations.
+ * use for specified durations. If no duration is specified the program
+ * runs till killed (duration required if run as MapReduce).
  * 
  * The script takes the form of lines of duration in seconds, read
  * probability and write probability, each separated by white space.
- * Blank lines and lines starting with # (comments) are ignored.
+ * Blank lines and lines starting with # (comments) are ignored. If load
+ * generator is run as a MapReduce program then the script file needs to be
+ * accessible on the the Map task as a HDFS file.
  * 
  * After command line argument parsing and data initialization,
  * the load generator spawns the number of worker threads 
@@ -116,31 +131,43 @@ import com.google.common.base.Preconditions;
 public class LoadGenerator extends Configured implements Tool {
   public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
   
-  private volatile boolean shouldRun = true;
-  private Path root = DataGenerator.DEFAULT_ROOT;
-  private FileContext fc;
-  private int maxDelayBetweenOps = 0;
-  private int numOfThreads = 200;
-  private long [] durations = {0};
-  private double [] readProbs = {0.3333};
-  private double [] writeProbs = {0.3333};
-  private volatile int currentIndex = 0;
-  long totalTime = 0;
-  private long startTime = Time.now()+10000;
+  private volatile static boolean shouldRun = true;
+  protected static Path root = DataGenerator.DEFAULT_ROOT;
+  private static FileContext fc;
+  protected static int maxDelayBetweenOps = 0;
+  protected static int numOfThreads = 200;
+  protected static long [] durations = {0};
+  protected static double [] readProbs = {0.3333};
+  protected static double [] writeProbs = {0.3333};
+  private static volatile int currentIndex = 0;
+  protected static long totalTime = 0;
+  protected static long startTime = Time.now()+10000;
   final static private int BLOCK_SIZE = 10;
-  private ArrayList<String> files = new ArrayList<String>();  // a table of file names
-  private ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
-  private Random r = null;
-  final private static String USAGE = "java LoadGenerator\n" +
-  	"-readProbability <read probability>\n" +
-    "-writeProbability <write probability>\n" +
-    "-root <root>\n" +
-    "-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" +
-    "-numOfThreads <numOfThreads>\n" +
-    "-elapsedTime <elapsedTimeInSecs>\n" +
-    "-startTime <startTimeInMillis>\n" +
-    "-scriptFile <filename>";
-  final private String hostname;
+  private static ArrayList<String> files = new ArrayList<String>();  // a table of file names
+  private static ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
+  protected static Random r = null;
+  protected static long seed = 0;
+  protected static String scriptFile = null;
+  protected static final String FLAGFILE_DEFAULT = "/tmp/flagFile";
+  protected static Path flagFile = new Path(FLAGFILE_DEFAULT);
+  protected String hostname;
+  final private static String USAGE_CMD = "java LoadGenerator\n";
+  final protected static String USAGE_ARGS = 
+	  "-readProbability <read probability>\n" +
+      "-writeProbability <write probability>\n" +
+      "-root <root>\n" +
+      "-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" +
+      "-numOfThreads <numOfThreads>\n" +
+      "-elapsedTime <elapsedTimeInSecs>\n" +
+      "-startTime <startTimeInMillis>\n" +
+      "-scriptFile <filename>\n" +
+      "-flagFile <filename>";
+  final private static String USAGE = USAGE_CMD + USAGE_ARGS;
+  
+
+  
+
+
   private final byte[] WRITE_CONTENTS = new byte[4096];
 
   private static final int ERR_TEST_FAILED = 2;
@@ -151,15 +178,21 @@ public class LoadGenerator extends Configured implements Tool {
     hostname = addr.getHostName();
     Arrays.fill(WRITE_CONTENTS, (byte) 'a');
   }
+  
+  public LoadGenerator(Configuration conf) throws IOException, UnknownHostException {
+    this();
+    setConf(conf);
+  }
 
-  private final static int OPEN = 0;
-  private final static int LIST = 1;
-  private final static int CREATE = 2;
-  private final static int WRITE_CLOSE = 3;
-  private final static int DELETE = 4;
-  private final static int TOTAL_OP_TYPES =5;
-  private long [] executionTime = new long[TOTAL_OP_TYPES];
-  private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
+  protected final static int OPEN = 0;
+  protected final static int LIST = 1;
+  protected final static int CREATE = 2;
+  protected final static int WRITE_CLOSE = 3;
+  protected final static int DELETE = 4;
+  protected final static int TOTAL_OP_TYPES =5;
+  protected static long [] executionTime = new long[TOTAL_OP_TYPES];
+  protected static long [] numOfOps = new long[TOTAL_OP_TYPES];
+  protected static long totalOps = 0; // across all of types
   
   /** A thread sends a stream of requests to the NameNode.
    * At each iteration, it first decides if it is going to read a file,
@@ -192,7 +225,7 @@ public class LoadGenerator extends Configured implements Tool {
       this.id = id;
     }
     
-    /** Main loop
+    /** Main loop for each thread
      * Each iteration decides what's the next operation and then pauses.
      */
     @Override
@@ -295,7 +328,7 @@ public class LoadGenerator extends Configured implements Tool {
             CreateOpts.createParent(), CreateOpts.bufferSize(4096),
             CreateOpts.repFac((short) 3));
         executionTime[CREATE] += (Time.now() - startTime);
-        totalNumOfOps[CREATE]++;
+        numOfOps[CREATE]++;
 
         long i = fileSize;
         while (i > 0) {
@@ -306,28 +339,67 @@ public class LoadGenerator extends Configured implements Tool {
 
         startTime = Time.now();
         executionTime[WRITE_CLOSE] += (Time.now() - startTime);
-        totalNumOfOps[WRITE_CLOSE]++;
+        numOfOps[WRITE_CLOSE]++;
       } finally {
         IOUtils.cleanup(LOG, out);
       }
     }
   }
   
-  /** Main function:
+  /** Main function called by tool runner.
    * It first initializes data by parsing the command line arguments.
-   * It then starts the number of DFSClient threads as specified by
-   * the user.
-   * It stops all the threads when the specified elapsed time is passed.
-   * Before exiting, it prints the average execution for 
-   * each operation and operation throughput.
+   * It then calls the loadGenerator
    */
   @Override
   public int run(String[] args) throws Exception {
-    int exitCode = init(args);
+    int exitCode = parseArgs(false, args);
     if (exitCode != 0) {
       return exitCode;
     }
+    System.out.println("Running LoadGenerator against fileSystem: " + 
+    FileContext.getFileContext().getDefaultFileSystem().getUri());
+    exitCode = generateLoadOnNN();
+    printResults(System.out);
+    return exitCode;
+  }
+    
+  boolean stopFileCreated() {
+    try {
+      fc.getFileStatus(flagFile);
+    } catch (FileNotFoundException e) {
+      return false;
+    } catch (IOException e) {
+      LOG.error("Got error when checking if file exists:" + flagFile, e);
+    }
+    LOG.info("Flag file was created. Stopping the test.");
+    return true;
+  }
+  
+ /**
+  * This is the main function - run threads to generate load on NN
+  * It starts the number of DFSClient threads as specified by
+  * the user.
+  * It stops all the threads when the specified elapsed time is passed.
+  */
+  protected int generateLoadOnNN() throws InterruptedException {
+    int hostHashCode = hostname.hashCode();
+    if (seed == 0) {
+      r = new Random(System.currentTimeMillis()+hostHashCode);
+    } else {
+      r = new Random(seed+hostHashCode);
+    }
+    try {
+      fc = FileContext.getFileContext(getConf());
+    } catch (IOException ioe) {
+      System.err.println("Can not initialize the file system: " + 
+          ioe.getLocalizedMessage());
+      return -1;
+    }
     
+    int status = initFileDirTables();
+    if (status != 0) {
+      return status;
+    }
     barrier();
     
     DFSClientThread[] threads = new DFSClientThread[numOfThreads];
@@ -337,91 +409,99 @@ public class LoadGenerator extends Configured implements Tool {
     }
     
     if (durations[0] > 0) {
-      while(shouldRun) {
-        Thread.sleep(durations[currentIndex] * 1000);
-        totalTime += durations[currentIndex];
-        
-        // Are we on the final line of the script?
-        if( (currentIndex + 1) == durations.length) {
-          shouldRun = false;
-        } else {
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Moving to index " + currentIndex + ": r = "
-                + readProbs[currentIndex] + ", w = " + writeProbs
-                + " for duration " + durations[currentIndex]);
+      if (durations.length == 1) {// There is a fixed run time
+        while (shouldRun) {
+          Thread.sleep(2000);
+          totalTime += 2;
+          if (totalTime >= durations[0] || stopFileCreated()) {
+            shouldRun = false;
+          }
+        }
+      } else {
+        // script run
+
+        while (shouldRun) {
+          Thread.sleep(durations[currentIndex] * 1000);
+          totalTime += durations[currentIndex];
+          // Are we on the final line of the script?
+          if ((currentIndex + 1) == durations.length || stopFileCreated()) {
+            shouldRun = false;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Moving to index " + currentIndex + ": r = "
+                  + readProbs[currentIndex] + ", w = " + writeProbs
+                  + " for duration " + durations[currentIndex]);
+            }
+            currentIndex++;
           }
-          currentIndex++;
         }
       }
-    } 
+    }
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("Done with testing.  Waiting for threads to finish.");
     }
-
+    
     boolean failed = false;
     for (DFSClientThread thread : threads) {
       thread.join();
       for (int i=0; i<TOTAL_OP_TYPES; i++) {
         executionTime[i] += thread.executionTime[i];
-        totalNumOfOps[i] += thread.totalNumOfOps[i];
+        numOfOps[i] += thread.totalNumOfOps[i];
       }
       failed = failed || thread.failed;
     }
-
+    int exitCode = 0;
     if (failed) {
       exitCode = -ERR_TEST_FAILED;
     }
 
-    long totalOps = 0;
+    totalOps = 0;
     for (int i=0; i<TOTAL_OP_TYPES; i++) {
-      totalOps += totalNumOfOps[i];
+      totalOps += numOfOps[i];
     }
-    
-    if (totalNumOfOps[OPEN] != 0) {
-      System.out.println("Average open execution time: " + 
-          (double)executionTime[OPEN]/totalNumOfOps[OPEN] + "ms");
-    }
-    if (totalNumOfOps[LIST] != 0) {
-      System.out.println("Average list execution time: " + 
-          (double)executionTime[LIST]/totalNumOfOps[LIST] + "ms");
-    }
-    if (totalNumOfOps[DELETE] != 0) {
-      System.out.println("Average deletion execution time: " + 
-          (double)executionTime[DELETE]/totalNumOfOps[DELETE] + "ms");
-      System.out.println("Average create execution time: " + 
-          (double)executionTime[CREATE]/totalNumOfOps[CREATE] + "ms");
-      System.out.println("Average write_close execution time: " + 
-          (double)executionTime[WRITE_CLOSE]/totalNumOfOps[WRITE_CLOSE] + "ms");
-    }
-    if (durations[0] != 0) { 
-      System.out.println("Average operations per second: " + 
+    return exitCode;
+  }
+  
+  protected static void printResults(PrintStream out) throws UnsupportedFileSystemException {
+    out.println("Result of running LoadGenerator against fileSystem: " + 
+    FileContext.getFileContext().getDefaultFileSystem().getUri());
+    if (numOfOps[OPEN] != 0) {
+      out.println("Average open execution time: " + 
+          (double)executionTime[OPEN]/numOfOps[OPEN] + "ms");
+    }
+    if (numOfOps[LIST] != 0) {
+      out.println("Average list execution time: " + 
+          (double)executionTime[LIST]/numOfOps[LIST] + "ms");
+    }
+    if (numOfOps[DELETE] != 0) {
+      out.println("Average deletion execution time: " + 
+          (double)executionTime[DELETE]/numOfOps[DELETE] + "ms");
+      out.println("Average create execution time: " + 
+          (double)executionTime[CREATE]/numOfOps[CREATE] + "ms");
+      out.println("Average write_close execution time: " + 
+          (double)executionTime[WRITE_CLOSE]/numOfOps[WRITE_CLOSE] + "ms");
+    }
+    if (totalTime != 0) { 
+      out.println("Average operations per second: " + 
           (double)totalOps/totalTime +"ops/s");
     }
-    System.out.println();
-    return exitCode;
+    out.println();
   }
+    
 
   /** Parse the command line arguments and initialize the data */
-  private int init(String[] args) throws IOException {
-    try {
-      fc = FileContext.getFileContext(getConf());
-    } catch (IOException ioe) {
-      System.err.println("Can not initialize the file system: " + 
-          ioe.getLocalizedMessage());
-      return -1;
-    }
-    int hostHashCode = hostname.hashCode();
-    boolean scriptSpecified = false;
-    
-    try {
+  protected int parseArgs(boolean runAsMapReduce, String[] args) throws IOException {
+   try {
       for (int i = 0; i < args.length; i++) { // parse command line
         if (args[i].equals("-scriptFile")) {
-          if(loadScriptFile(args[++i]) == -1)
+          scriptFile = args[++i];
+          if (durations[0] > 0)  {
+            System.err.println("Can't specify elapsedTime and use script.");
             return -1;
-          scriptSpecified = true;
+          }
         } else if (args[i].equals("-readProbability")) {
-          if(scriptSpecified) {
+          if (scriptFile != null) {
             System.err.println("Can't specify probabilities and use script.");
             return -1;
           }
@@ -432,7 +512,7 @@ public class LoadGenerator extends Configured implements Tool {
             return -1;
           }
         } else if (args[i].equals("-writeProbability")) {
-          if(scriptSpecified) {
+          if (scriptFile != null) {
             System.err.println("Can't specify probabilities and use script.");
             return -1;
           }
@@ -456,14 +536,18 @@ public class LoadGenerator extends Configured implements Tool {
         } else if (args[i].equals("-startTime")) {
           startTime = Long.parseLong(args[++i]);
         } else if (args[i].equals("-elapsedTime")) {
-          if(scriptSpecified) {
+          if (scriptFile != null) {
             System.err.println("Can't specify elapsedTime and use script.");
             return -1;
           }
           durations[0] = Long.parseLong(args[++i]);
         } else if (args[i].equals("-seed")) {
-          r = new Random(Long.parseLong(args[++i])+hostHashCode);
-        } else {
+          seed = Long.parseLong(args[++i]);
+          r = new Random(seed);
+        }  else if (args[i].equals("-flagFile")) {
+          LOG.info("got flagFile:" + flagFile);
+          flagFile = new Path(args[++i]);
+        }else { 
           System.err.println(USAGE);
           ToolRunner.printGenericCommandUsage(System.err);
           return -1;
@@ -475,6 +559,12 @@ public class LoadGenerator extends Configured implements Tool {
       return -1;
     }
     
+    // Load Script File if not MR; for MR scriptFile is loaded by Mapper
+    if (!runAsMapReduce && scriptFile != null) { 
+      if(loadScriptFile(scriptFile, true) == -1)
+        return -1;
+    }
+    
     for(int i = 0; i < readProbs.length; i++) {
       if (readProbs[i] + writeProbs[i] <0 || readProbs[i]+ writeProbs[i] > 1) {
         System.err.println(
@@ -483,12 +573,7 @@ public class LoadGenerator extends Configured implements Tool {
         return -1;
       }
     }
-    
-    if (r==null) {
-      r = new Random(Time.now()+hostHashCode);
-    }
-    
-    return initFileDirTables();
+    return 0;
   }
 
   private static void parseScriptLine(String line, ArrayList<Long> duration,
@@ -527,9 +612,25 @@ public class LoadGenerator extends Configured implements Tool {
    * @return 0 if successful, -1 if not
    * @throws IOException if errors with file IO
    */
-  private int loadScriptFile(String filename) throws IOException  {
-    FileReader fr = new FileReader(new File(filename));
-    BufferedReader br = new BufferedReader(fr);
+  protected static int loadScriptFile(String filename, boolean readLocally) throws IOException {
+    
+    FileContext fc;
+    if (readLocally) { // read locally - program is run without MR
+      fc = FileContext.getLocalFSFileContext();
+    } else {
+      fc = FileContext.getFileContext(); // use default file system
+    }
+    DataInputStream in = null;
+    try {
+      in = fc.open(new Path(filename));
+    } catch (IOException e) {
+      System.err.println("Unable to open scriptFile: " + filename);
+
+      System.exit(-1);
+    } 
+    InputStreamReader inr = new InputStreamReader(in);
+    
+    BufferedReader br = new BufferedReader(inr);
     ArrayList<Long> duration  = new ArrayList<Long>();
     ArrayList<Double> readProb  = new ArrayList<Double>();
     ArrayList<Double> writeProb = new ArrayList<Double>();
@@ -619,7 +720,7 @@ public class LoadGenerator extends Configured implements Tool {
    * This allows multiple instances of this program, running on clock
    * synchronized nodes, to start at roughly the same time.
    */
-  private void barrier() {
+  private static void barrier() {
     long sleepTime;
     while ((sleepTime = startTime - Time.now()) > 0) {
       try {
@@ -635,9 +736,7 @@ public class LoadGenerator extends Configured implements Tool {
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(new Configuration(),
-        new LoadGenerator(), args);
+    int res = ToolRunner.run(new Configuration(), new LoadGenerator(), args);
     System.exit(res);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java
new file mode 100644
index 0000000..c47d971
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.loadGenerator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.UnknownHostException;
+import java.util.EnumSet;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+
+/** The load generator is a tool for testing NameNode behavior under
+ * different client loads.
+ * The main code is in HadoopCommon, @LoadGenerator. This class, LoadGeneratorMR
+ * lets you run that LoadGenerator as a MapReduce job.
+ * 
+ * The synopsis of the command is
+ * java LoadGeneratorMR
+ *   -mr <numMapJobs> <outputDir> : results in outputDir/Results
+ *   the rest of the args are the same as the original LoadGenerator.
+ *
+ */
+public class LoadGeneratorMR extends LoadGenerator {
+  public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
+  private static int numMapTasks = 1;
+  private String mrOutDir;
+  
+  final private static String USAGE_CMD = "java LoadGeneratorMR\n";
+  final private static String USAGE = USAGE_CMD
+		  + "-mr <numMapJobs> <outputDir> [MUST be first 3 args] \n" + USAGE_ARGS ;
+  
+  // Constant "keys" used to communicate between map and reduce
+  final private static Text OPEN_EXECTIME = new Text("OpenExecutionTime");
+  final private static Text NUMOPS_OPEN = new Text("NumOpsOpen");
+  final private static Text LIST_EXECTIME = new Text("ListExecutionTime");
+  final private static Text NUMOPS_LIST = new Text("NumOpsList");
+  final private static Text DELETE_EXECTIME = new Text("DeletionExecutionTime");
+  final private static Text NUMOPS_DELETE = new Text("NumOpsDelete");
+  final private static Text CREATE_EXECTIME = new Text("CreateExecutionTime");
+  final private static Text NUMOPS_CREATE = new Text("NumOpsCreate");
+  final private static Text WRITE_CLOSE_EXECTIME = new Text("WriteCloseExecutionTime");
+  final private static Text NUMOPS_WRITE_CLOSE = new Text("NumOpsWriteClose");
+  final private static Text ELAPSED_TIME = new Text("ElapsedTime");
+  final private static Text TOTALOPS = new Text("TotalOps");
+  
+  // Config keys to pass args from Main to the Job
+  final private static String LG_ROOT = "LG.root";
+  final private static String LG_SCRIPTFILE = "LG.scriptFile";
+  final private static String LG_MAXDELAYBETWEENOPS = "LG.maxDelayBetweenOps";
+  final private static String LG_NUMOFTHREADS = "LG.numOfThreads";
+  final private static String LG_READPR = "LG.readPr";
+  final private static String LG_WRITEPR = "LG.writePr";
+  final private static String LG_SEED = "LG.r";
+  final private static String LG_NUMMAPTASKS = "LG.numMapTasks";
+  final private static String LG_ELAPSEDTIME = "LG.elapsedTime";
+  final private static String LG_STARTTIME = "LG.startTime";
+  final private static String LG_FLAGFILE = "LG.flagFile";
+
+
+  /** Constructor */
+  public LoadGeneratorMR() throws IOException, UnknownHostException {
+	super();
+  }
+  
+  public LoadGeneratorMR(Configuration conf) throws IOException, UnknownHostException {
+    this();
+    setConf(conf);
+  } 
+  
+  /** Main function called by tool runner.
+   * It first initializes data by parsing the command line arguments.
+   * It then calls the loadGenerator
+   */
+  @Override
+  public int run(String[] args) throws Exception {
+    int exitCode = parseArgsMR(args);
+    if (exitCode != 0) {
+      return exitCode;
+    }
+    System.out.println("Running LoadGeneratorMR against fileSystem: " + 
+    FileContext.getFileContext().getDefaultFileSystem().getUri());
+
+    return submitAsMapReduce(); // reducer will print the results
+  }
+
+
+  /** 
+   * Parse the command line arguments and initialize the data.
+   * Only parse the first arg: -mr <numMapTasks> <mrOutDir> (MUST be first three Args)
+   * The rest are parsed by the Parent LoadGenerator
+   **/
+ 
+	private int parseArgsMR(String[] args) throws IOException {
+	  try {
+		if (args.length >= 3 && args[0].equals("-mr")) {
+		  numMapTasks = Integer.parseInt(args[1]);
+		  mrOutDir = args[2];
+		  if (mrOutDir.startsWith("-")) {
+			System.err.println("Missing output file parameter, instead got: "
+							+ mrOutDir);
+			System.err.println(USAGE);
+			return -1;
+		  }
+		} else {
+		  System.err.println(USAGE);
+		  ToolRunner.printGenericCommandUsage(System.err);
+		  return -1;
+		}
+		String[] strippedArgs = new String[args.length - 3];
+		for (int i = 0; i < strippedArgs.length; i++) {
+		  strippedArgs[i] = args[i + 3];
+		}
+		super.parseArgs(true, strippedArgs); // Parse normal LoadGenerator args
+	  } catch (NumberFormatException e) {
+		System.err.println("Illegal parameter: " + e.getLocalizedMessage());
+		System.err.println(USAGE);
+		return -1;
+	  }
+	  return 0;
+	}
+
+  /** Main program
+   * 
+   * @param args command line arguments
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new LoadGeneratorMR(), args);
+    System.exit(res);
+  }
+
+  
+  // The following methods are only used when LoadGenerator is run a MR job
+  /**
+   * Based on args we submit the LoadGenerator as MR job.
+   * Number of MapTasks is numMapTasks
+   * @return exitCode for job submission
+   */
+  private int submitAsMapReduce() {
+    
+    System.out.println("Running as a MapReduce job with " + 
+        numMapTasks + " mapTasks;  Output to file " + mrOutDir);
+
+
+    Configuration conf = new Configuration(getConf());
+    
+    // First set all the args of LoadGenerator as Conf vars to pass to MR tasks
+
+    conf.set(LG_ROOT , root.toString());
+    conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
+    conf.setInt(LG_NUMOFTHREADS, numOfThreads);
+    conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
+    conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
+    conf.setLong(LG_SEED, seed); //No idea what this is
+    conf.setInt(LG_NUMMAPTASKS, numMapTasks);
+    if (scriptFile == null && durations[0] <=0) {
+      System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
+      System.exit(-1);
+    }
+    conf.setLong(LG_ELAPSEDTIME, durations[0]);
+    conf.setLong(LG_STARTTIME, startTime); 
+    if (scriptFile != null) {
+      conf.set(LG_SCRIPTFILE , scriptFile);
+    }
+    conf.set(LG_FLAGFILE, flagFile.toString());
+    
+    // Now set the necessary conf variables that apply to run MR itself.
+    JobConf jobConf = new JobConf(conf, LoadGenerator.class);
+    jobConf.setJobName("NNLoadGeneratorViaMR");
+    jobConf.setNumMapTasks(numMapTasks);
+    jobConf.setNumReduceTasks(1); // 1 reducer to collect the results
+
+    jobConf.setOutputKeyClass(Text.class);
+    jobConf.setOutputValueClass(IntWritable.class);
+
+    jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
+    jobConf.setReducerClass(ReducerThatCollectsLGdata.class);
+
+    jobConf.setInputFormat(DummyInputFormat.class);
+    jobConf.setOutputFormat(TextOutputFormat.class);
+    
+    // Explicitly set number of max map attempts to 1.
+    jobConf.setMaxMapAttempts(1);
+    // Explicitly turn off speculative execution
+    jobConf.setSpeculativeExecution(false);
+
+    // This mapReduce job has no input but has output
+    FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));
+
+    try {
+      JobClient.runJob(jobConf);
+    } catch (IOException e) {
+      System.err.println("Failed to run job: " + e.getMessage());
+      return -1;
+    }
+    return 0;
+    
+  }
+
+  
+  // Each split is empty
+  public static class EmptySplit implements InputSplit {
+    public void write(DataOutput out) throws IOException {}
+    public void readFields(DataInput in) throws IOException {}
+    public long getLength() {return 0L;}
+    public String[] getLocations() {return new String[0];}
+  }
+
+  // Dummy Input format to send 1 record - number of spits is numMapTasks
+  public static class DummyInputFormat extends Configured implements
+      InputFormat<LongWritable, Text> {
+
+    public InputSplit[] getSplits(JobConf conf, int numSplits) {
+      numSplits = conf.getInt("LG.numMapTasks", 1);
+      InputSplit[] ret = new InputSplit[numSplits];
+      for (int i = 0; i < numSplits; ++i) {
+        ret[i] = new EmptySplit();
+      }
+      return ret;
+    }
+
+    public RecordReader<LongWritable, Text> getRecordReader(
+        InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {
+
+      return new RecordReader<LongWritable, Text>() {
+
+        boolean sentOneRecord = false;
+
+        public boolean next(LongWritable key, Text value)
+            throws IOException {
+          key.set(1);
+          value.set("dummy");
+          if (sentOneRecord == false) { // first call
+            sentOneRecord = true;
+            return true;
+          }
+          return false; // we have sent one record - we are done
+        }
+
+        public LongWritable createKey() {
+          return new LongWritable();
+        }
+        public Text createValue() {
+          return new Text();
+        }
+        public long getPos() throws IOException {
+          return 1;
+        }
+        public void close() throws IOException {
+        }
+        public float getProgress() throws IOException {
+          return 1;
+        }
+      };
+    }
+  }
+
+  public static class MapperThatRunsNNLoadGenerator extends MapReduceBase
+      implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+    private JobConf jobConf;
+
+    @Override
+    public void configure(JobConf job) {
+      this.jobConf = job;
+      getArgsFromConfiguration(jobConf);
+    }
+
+    private class ProgressThread extends Thread {
+
+      boolean keepGoing; // while this is true, thread runs.
+      private Reporter reporter;
+
+      public ProgressThread(final Reporter r) {
+        this.reporter = r;
+        this.keepGoing = true;
+      }
+
+      public void run() {
+        while (keepGoing) {
+          if (!ProgressThread.interrupted()) {
+            try {
+              sleep(30 * 1000);
+            } catch (InterruptedException e) {
+            }
+          }
+          reporter.progress();
+        }
+      }
+    }
+
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      ProgressThread progressThread = new ProgressThread(reporter);
+      progressThread.start();
+      try {
+        new LoadGenerator(jobConf).generateLoadOnNN();
+        System.out
+            .println("Finished generating load on NN, sending results to the reducer");
+        printResults(System.out);
+        progressThread.keepGoing = false;
+        progressThread.join();
+
+        // Send results to Reducer
+        output.collect(OPEN_EXECTIME,
+            new IntWritable((int) executionTime[OPEN]));
+        output.collect(NUMOPS_OPEN, new IntWritable((int) numOfOps[OPEN]));
+
+        output.collect(LIST_EXECTIME,
+            new IntWritable((int) executionTime[LIST]));
+        output.collect(NUMOPS_LIST, new IntWritable((int) numOfOps[LIST]));
+
+        output.collect(DELETE_EXECTIME, new IntWritable(
+            (int) executionTime[DELETE]));
+        output.collect(NUMOPS_DELETE, new IntWritable((int) numOfOps[DELETE]));
+
+        output.collect(CREATE_EXECTIME, new IntWritable(
+            (int) executionTime[CREATE]));
+        output.collect(NUMOPS_CREATE, new IntWritable((int) numOfOps[CREATE]));
+
+        output.collect(WRITE_CLOSE_EXECTIME, new IntWritable(
+            (int) executionTime[WRITE_CLOSE]));
+        output.collect(NUMOPS_WRITE_CLOSE, new IntWritable(
+            (int) numOfOps[WRITE_CLOSE]));
+
+        output.collect(TOTALOPS, new IntWritable((int) totalOps));
+        output.collect(ELAPSED_TIME, new IntWritable((int) totalTime));
+
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    public void getArgsFromConfiguration(Configuration conf) {
+
+      maxDelayBetweenOps = conf.getInt(LG_MAXDELAYBETWEENOPS,
+          maxDelayBetweenOps);
+      numOfThreads = conf.getInt(LG_NUMOFTHREADS, numOfThreads);
+      readProbs[0] = Double.parseDouble(conf.get(LG_READPR, readProbs[0] + ""));
+      writeProbs[0] = Double.parseDouble(conf.get(LG_WRITEPR, writeProbs[0]
+          + ""));
+      seed = conf.getLong(LG_SEED, seed);
+      numMapTasks = conf.getInt(LG_NUMMAPTASKS, numMapTasks);
+      root = new Path(conf.get(LG_ROOT, root.toString()));
+      durations[0] = conf.getLong(LG_ELAPSEDTIME, 0);
+      startTime = conf.getLong(LG_STARTTIME, 0);
+      scriptFile = conf.get(LG_SCRIPTFILE, null);
+      flagFile = new Path(conf.get(LG_FLAGFILE, FLAGFILE_DEFAULT));
+      if (durations[0] > 0 && scriptFile != null) {
+        System.err.println("Cannot specify both ElapsedTime and ScriptFile, exiting");
+        System.exit(-1);
+      }
+     
+      try {
+        if (scriptFile != null && loadScriptFile(scriptFile, false) < 0) {
+          System.err.println("Error in scriptFile, exiting");
+          System.exit(-1);
+        }
+      } catch (IOException e) {
+       System.err.println("Error loading script file " + scriptFile);
+        e.printStackTrace();
+      }
+      if (durations[0] <= 0) {
+        System.err.println("A duration of zero or less is not allowed when running via MapReduce.");
+        System.exit(-1);
+      }
+    }
+  }
+
+  public static class ReducerThatCollectsLGdata extends MapReduceBase implements
+      Reducer<Text, IntWritable, Text, IntWritable> {
+    private IntWritable result = new IntWritable();
+    private JobConf jobConf;
+    
+    @Override
+    public void configure(JobConf job) {
+      this.jobConf = job;
+    }
+    
+    @Override
+    public void reduce(Text key, Iterator<IntWritable> values,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+      if (key.equals(OPEN_EXECTIME)){
+        executionTime[OPEN] = sum;
+      } else if (key.equals(NUMOPS_OPEN)){
+        numOfOps[OPEN] = sum;
+      } else if (key.equals(LIST_EXECTIME)){
+        executionTime[LIST] = sum;
+      } else if (key.equals(NUMOPS_LIST)){
+        numOfOps[LIST] = sum;
+      } else if (key.equals(DELETE_EXECTIME)){
+        executionTime[DELETE] = sum;
+      } else if (key.equals(NUMOPS_DELETE)){
+        numOfOps[DELETE] = sum;
+      } else if (key.equals(CREATE_EXECTIME)){
+        executionTime[CREATE] = sum;
+      } else if (key.equals(NUMOPS_CREATE)){
+        numOfOps[CREATE] = sum;
+      } else if (key.equals(WRITE_CLOSE_EXECTIME)){
+        System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum);
+        executionTime[WRITE_CLOSE]= sum;
+      } else if (key.equals(NUMOPS_WRITE_CLOSE)){
+        numOfOps[WRITE_CLOSE] = sum;
+      } else if (key.equals(TOTALOPS)){
+        totalOps = sum;
+      } else if (key.equals(ELAPSED_TIME)){
+        totalTime = sum;
+      }
+      result.set(sum);
+      output.collect(key, result);
+      // System.out.println("Key = " + key + " Sum is =" + sum);
+      // printResults(System.out);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Output the result to a file Results in the output dir
+      FileContext fc;
+      try {
+        fc = FileContext.getFileContext(jobConf);
+      } catch (IOException ioe) {
+        System.err.println("Can not initialize the file system: " + 
+            ioe.getLocalizedMessage());
+        return;
+      }
+      FSDataOutputStream o = fc.create(FileOutputFormat.getTaskOutputPath(jobConf, "Results"),
+          EnumSet.of(CreateFlag.CREATE));
+         
+      PrintStream out = new PrintStream(o);
+      printResults(out);
+      out.close();
+      o.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
index f2cd53c..b1dfe56 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
@@ -41,6 +41,10 @@ import org.apache.hadoop.fs.DFSCIOTest;
 import org.apache.hadoop.fs.DistributedFSCheck;
 import org.apache.hadoop.io.FileBench;
 import org.apache.hadoop.fs.JHLogAnalyzer;
+import org.apache.hadoop.fs.loadGenerator.DataGenerator;
+import org.apache.hadoop.fs.loadGenerator.LoadGenerator;
+import org.apache.hadoop.fs.loadGenerator.LoadGeneratorMR;
+import org.apache.hadoop.fs.loadGenerator.StructureGenerator;
 import org.apache.hadoop.fs.slive.SliveTest;
 
 /**
@@ -107,6 +111,14 @@ public class MapredTestDriver {
       "Single process HDFS and MR cluster.");
       pgd.addClass("largesorter", LargeSorter.class,
           "Large-Sort tester");
+      pgd.addClass("NNloadGenerator", LoadGenerator.class,
+              "Generate load on Namenode using NN loadgenerator run WITHOUT MR");
+      pgd.addClass("NNloadGeneratorMR", LoadGeneratorMR.class,
+          "Generate load on Namenode using NN loadgenerator run as MR job");
+      pgd.addClass("NNstructureGenerator", StructureGenerator.class,
+          "Generate the structure to be used by NNdataGenerator");
+      pgd.addClass("NNdataGenerator", DataGenerator.class,
+          "Generate the data to be used by NNloadGenerator");
     } catch(Throwable e) {
       e.printStackTrace();
     }