You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/01/12 19:19:12 UTC
[30/34] hadoop git commit: HADOOP-9992. Modify the NN loadGenerator
to optionally run as a MapReduce job. Contributed by Akshay Radia
HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job. Contributed by Akshay Radia
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63947cce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63947cce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63947cce
Branch: refs/heads/HDFS-EC
Commit: 63947ccedb9f84fd39822c6152cbeaf4dca37f28
Parents: bc251b7
Author: Brandon Li <br...@apache.org>
Authored: Fri Jan 9 17:24:22 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Jan 12 10:18:02 2015 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../hadoop/fs/loadGenerator/LoadGenerator.java | 343 ++++++++-----
.../fs/loadGenerator/LoadGeneratorMR.java | 483 +++++++++++++++++++
.../apache/hadoop/test/MapredTestDriver.java | 12 +
4 files changed, 719 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 37a451e..90bfe3e 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -469,6 +469,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11464. Reinstate support for launching Hadoop processes on Windows
using Cygwin. (cnauroth)
+ HADOOP-9992. Modify the NN loadGenerator to optionally run as a MapReduce job
+ (Akshay Radia via brandonli)
+
OPTIMIZATIONS
HADOOP-11323. WritableComparator#compare keeps reference to byte array.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
index 994b9b2..ca01702 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGenerator.java
@@ -19,10 +19,12 @@
package org.apache.hadoop.fs.loadGenerator;
import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -36,10 +38,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options.CreateOpts;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
@@ -48,8 +51,11 @@ import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Preconditions;
/** The load generator is a tool for testing NameNode behavior under
- * different client loads.
- * It allows the user to generate different mixes of read, write,
+ * different client loads. Note there is a subclass of this clas that lets
+ * you run a the load generator as a MapReduce job (see LoadGeneratorMR in the
+ * MapReduce project.
+ *
+ * The loadGenerator allows the user to generate different mixes of read, write,
* and list requests by specifying the probabilities of read and
* write. The user controls the intensity of the load by
* adjusting parameters for the number of worker threads and the delay
@@ -58,15 +64,24 @@ import com.google.common.base.Preconditions;
* generator exits, it print some NameNode statistics like the average
* execution time of each kind of operations and the NameNode
* throughput.
+ *
+ * The program can run in one of two forms. As a regular single process command
+ * that runs multiple threads to generate load on the NN or as a Map Reduce
+ * program that runs multiple (multi-threaded) map tasks that generate load
+ * on the NN; the results summary is generated by a single reduce task.
+ *
*
* The user may either specify constant duration, read and write
* probabilities via the command line, or may specify a text file
* that acts as a script of which read and write probabilities to
- * use for specified durations.
+ * use for specified durations. If no duration is specified the program
+ * runs till killed (duration required if run as MapReduce).
*
* The script takes the form of lines of duration in seconds, read
* probability and write probability, each separated by white space.
- * Blank lines and lines starting with # (comments) are ignored.
+ * Blank lines and lines starting with # (comments) are ignored. If load
+ * generator is run as a MapReduce program then the script file needs to be
+ * accessible on the the Map task as a HDFS file.
*
* After command line argument parsing and data initialization,
* the load generator spawns the number of worker threads
@@ -116,31 +131,43 @@ import com.google.common.base.Preconditions;
public class LoadGenerator extends Configured implements Tool {
public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
- private volatile boolean shouldRun = true;
- private Path root = DataGenerator.DEFAULT_ROOT;
- private FileContext fc;
- private int maxDelayBetweenOps = 0;
- private int numOfThreads = 200;
- private long [] durations = {0};
- private double [] readProbs = {0.3333};
- private double [] writeProbs = {0.3333};
- private volatile int currentIndex = 0;
- long totalTime = 0;
- private long startTime = Time.now()+10000;
+ private volatile static boolean shouldRun = true;
+ protected static Path root = DataGenerator.DEFAULT_ROOT;
+ private static FileContext fc;
+ protected static int maxDelayBetweenOps = 0;
+ protected static int numOfThreads = 200;
+ protected static long [] durations = {0};
+ protected static double [] readProbs = {0.3333};
+ protected static double [] writeProbs = {0.3333};
+ private static volatile int currentIndex = 0;
+ protected static long totalTime = 0;
+ protected static long startTime = Time.now()+10000;
final static private int BLOCK_SIZE = 10;
- private ArrayList<String> files = new ArrayList<String>(); // a table of file names
- private ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
- private Random r = null;
- final private static String USAGE = "java LoadGenerator\n" +
- "-readProbability <read probability>\n" +
- "-writeProbability <write probability>\n" +
- "-root <root>\n" +
- "-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" +
- "-numOfThreads <numOfThreads>\n" +
- "-elapsedTime <elapsedTimeInSecs>\n" +
- "-startTime <startTimeInMillis>\n" +
- "-scriptFile <filename>";
- final private String hostname;
+ private static ArrayList<String> files = new ArrayList<String>(); // a table of file names
+ private static ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
+ protected static Random r = null;
+ protected static long seed = 0;
+ protected static String scriptFile = null;
+ protected static final String FLAGFILE_DEFAULT = "/tmp/flagFile";
+ protected static Path flagFile = new Path(FLAGFILE_DEFAULT);
+ protected String hostname;
+ final private static String USAGE_CMD = "java LoadGenerator\n";
+ final protected static String USAGE_ARGS =
+ "-readProbability <read probability>\n" +
+ "-writeProbability <write probability>\n" +
+ "-root <root>\n" +
+ "-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>\n" +
+ "-numOfThreads <numOfThreads>\n" +
+ "-elapsedTime <elapsedTimeInSecs>\n" +
+ "-startTime <startTimeInMillis>\n" +
+ "-scriptFile <filename>\n" +
+ "-flagFile <filename>";
+ final private static String USAGE = USAGE_CMD + USAGE_ARGS;
+
+
+
+
+
private final byte[] WRITE_CONTENTS = new byte[4096];
private static final int ERR_TEST_FAILED = 2;
@@ -151,15 +178,21 @@ public class LoadGenerator extends Configured implements Tool {
hostname = addr.getHostName();
Arrays.fill(WRITE_CONTENTS, (byte) 'a');
}
+
+ public LoadGenerator(Configuration conf) throws IOException, UnknownHostException {
+ this();
+ setConf(conf);
+ }
- private final static int OPEN = 0;
- private final static int LIST = 1;
- private final static int CREATE = 2;
- private final static int WRITE_CLOSE = 3;
- private final static int DELETE = 4;
- private final static int TOTAL_OP_TYPES =5;
- private long [] executionTime = new long[TOTAL_OP_TYPES];
- private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
+ protected final static int OPEN = 0;
+ protected final static int LIST = 1;
+ protected final static int CREATE = 2;
+ protected final static int WRITE_CLOSE = 3;
+ protected final static int DELETE = 4;
+ protected final static int TOTAL_OP_TYPES =5;
+ protected static long [] executionTime = new long[TOTAL_OP_TYPES];
+ protected static long [] numOfOps = new long[TOTAL_OP_TYPES];
+ protected static long totalOps = 0; // across all of types
/** A thread sends a stream of requests to the NameNode.
* At each iteration, it first decides if it is going to read a file,
@@ -192,7 +225,7 @@ public class LoadGenerator extends Configured implements Tool {
this.id = id;
}
- /** Main loop
+ /** Main loop for each thread
* Each iteration decides what's the next operation and then pauses.
*/
@Override
@@ -295,7 +328,7 @@ public class LoadGenerator extends Configured implements Tool {
CreateOpts.createParent(), CreateOpts.bufferSize(4096),
CreateOpts.repFac((short) 3));
executionTime[CREATE] += (Time.now() - startTime);
- totalNumOfOps[CREATE]++;
+ numOfOps[CREATE]++;
long i = fileSize;
while (i > 0) {
@@ -306,28 +339,67 @@ public class LoadGenerator extends Configured implements Tool {
startTime = Time.now();
executionTime[WRITE_CLOSE] += (Time.now() - startTime);
- totalNumOfOps[WRITE_CLOSE]++;
+ numOfOps[WRITE_CLOSE]++;
} finally {
IOUtils.cleanup(LOG, out);
}
}
}
- /** Main function:
+ /** Main function called by tool runner.
* It first initializes data by parsing the command line arguments.
- * It then starts the number of DFSClient threads as specified by
- * the user.
- * It stops all the threads when the specified elapsed time is passed.
- * Before exiting, it prints the average execution for
- * each operation and operation throughput.
+ * It then calls the loadGenerator
*/
@Override
public int run(String[] args) throws Exception {
- int exitCode = init(args);
+ int exitCode = parseArgs(false, args);
if (exitCode != 0) {
return exitCode;
}
+ System.out.println("Running LoadGenerator against fileSystem: " +
+ FileContext.getFileContext().getDefaultFileSystem().getUri());
+ exitCode = generateLoadOnNN();
+ printResults(System.out);
+ return exitCode;
+ }
+
+ boolean stopFileCreated() {
+ try {
+ fc.getFileStatus(flagFile);
+ } catch (FileNotFoundException e) {
+ return false;
+ } catch (IOException e) {
+ LOG.error("Got error when checking if file exists:" + flagFile, e);
+ }
+ LOG.info("Flag file was created. Stopping the test.");
+ return true;
+ }
+
+ /**
+ * This is the main function - run threads to generate load on NN
+ * It starts the number of DFSClient threads as specified by
+ * the user.
+ * It stops all the threads when the specified elapsed time is passed.
+ */
+ protected int generateLoadOnNN() throws InterruptedException {
+ int hostHashCode = hostname.hashCode();
+ if (seed == 0) {
+ r = new Random(System.currentTimeMillis()+hostHashCode);
+ } else {
+ r = new Random(seed+hostHashCode);
+ }
+ try {
+ fc = FileContext.getFileContext(getConf());
+ } catch (IOException ioe) {
+ System.err.println("Can not initialize the file system: " +
+ ioe.getLocalizedMessage());
+ return -1;
+ }
+ int status = initFileDirTables();
+ if (status != 0) {
+ return status;
+ }
barrier();
DFSClientThread[] threads = new DFSClientThread[numOfThreads];
@@ -337,91 +409,99 @@ public class LoadGenerator extends Configured implements Tool {
}
if (durations[0] > 0) {
- while(shouldRun) {
- Thread.sleep(durations[currentIndex] * 1000);
- totalTime += durations[currentIndex];
-
- // Are we on the final line of the script?
- if( (currentIndex + 1) == durations.length) {
- shouldRun = false;
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Moving to index " + currentIndex + ": r = "
- + readProbs[currentIndex] + ", w = " + writeProbs
- + " for duration " + durations[currentIndex]);
+ if (durations.length == 1) {// There is a fixed run time
+ while (shouldRun) {
+ Thread.sleep(2000);
+ totalTime += 2;
+ if (totalTime >= durations[0] || stopFileCreated()) {
+ shouldRun = false;
+ }
+ }
+ } else {
+ // script run
+
+ while (shouldRun) {
+ Thread.sleep(durations[currentIndex] * 1000);
+ totalTime += durations[currentIndex];
+ // Are we on the final line of the script?
+ if ((currentIndex + 1) == durations.length || stopFileCreated()) {
+ shouldRun = false;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Moving to index " + currentIndex + ": r = "
+ + readProbs[currentIndex] + ", w = " + writeProbs
+ + " for duration " + durations[currentIndex]);
+ }
+ currentIndex++;
}
- currentIndex++;
}
}
- }
+ }
if(LOG.isDebugEnabled()) {
LOG.debug("Done with testing. Waiting for threads to finish.");
}
-
+
boolean failed = false;
for (DFSClientThread thread : threads) {
thread.join();
for (int i=0; i<TOTAL_OP_TYPES; i++) {
executionTime[i] += thread.executionTime[i];
- totalNumOfOps[i] += thread.totalNumOfOps[i];
+ numOfOps[i] += thread.totalNumOfOps[i];
}
failed = failed || thread.failed;
}
-
+ int exitCode = 0;
if (failed) {
exitCode = -ERR_TEST_FAILED;
}
- long totalOps = 0;
+ totalOps = 0;
for (int i=0; i<TOTAL_OP_TYPES; i++) {
- totalOps += totalNumOfOps[i];
+ totalOps += numOfOps[i];
}
-
- if (totalNumOfOps[OPEN] != 0) {
- System.out.println("Average open execution time: " +
- (double)executionTime[OPEN]/totalNumOfOps[OPEN] + "ms");
- }
- if (totalNumOfOps[LIST] != 0) {
- System.out.println("Average list execution time: " +
- (double)executionTime[LIST]/totalNumOfOps[LIST] + "ms");
- }
- if (totalNumOfOps[DELETE] != 0) {
- System.out.println("Average deletion execution time: " +
- (double)executionTime[DELETE]/totalNumOfOps[DELETE] + "ms");
- System.out.println("Average create execution time: " +
- (double)executionTime[CREATE]/totalNumOfOps[CREATE] + "ms");
- System.out.println("Average write_close execution time: " +
- (double)executionTime[WRITE_CLOSE]/totalNumOfOps[WRITE_CLOSE] + "ms");
- }
- if (durations[0] != 0) {
- System.out.println("Average operations per second: " +
+ return exitCode;
+ }
+
+ protected static void printResults(PrintStream out) throws UnsupportedFileSystemException {
+ out.println("Result of running LoadGenerator against fileSystem: " +
+ FileContext.getFileContext().getDefaultFileSystem().getUri());
+ if (numOfOps[OPEN] != 0) {
+ out.println("Average open execution time: " +
+ (double)executionTime[OPEN]/numOfOps[OPEN] + "ms");
+ }
+ if (numOfOps[LIST] != 0) {
+ out.println("Average list execution time: " +
+ (double)executionTime[LIST]/numOfOps[LIST] + "ms");
+ }
+ if (numOfOps[DELETE] != 0) {
+ out.println("Average deletion execution time: " +
+ (double)executionTime[DELETE]/numOfOps[DELETE] + "ms");
+ out.println("Average create execution time: " +
+ (double)executionTime[CREATE]/numOfOps[CREATE] + "ms");
+ out.println("Average write_close execution time: " +
+ (double)executionTime[WRITE_CLOSE]/numOfOps[WRITE_CLOSE] + "ms");
+ }
+ if (totalTime != 0) {
+ out.println("Average operations per second: " +
(double)totalOps/totalTime +"ops/s");
}
- System.out.println();
- return exitCode;
+ out.println();
}
+
/** Parse the command line arguments and initialize the data */
- private int init(String[] args) throws IOException {
- try {
- fc = FileContext.getFileContext(getConf());
- } catch (IOException ioe) {
- System.err.println("Can not initialize the file system: " +
- ioe.getLocalizedMessage());
- return -1;
- }
- int hostHashCode = hostname.hashCode();
- boolean scriptSpecified = false;
-
- try {
+ protected int parseArgs(boolean runAsMapReduce, String[] args) throws IOException {
+ try {
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equals("-scriptFile")) {
- if(loadScriptFile(args[++i]) == -1)
+ scriptFile = args[++i];
+ if (durations[0] > 0) {
+ System.err.println("Can't specify elapsedTime and use script.");
return -1;
- scriptSpecified = true;
+ }
} else if (args[i].equals("-readProbability")) {
- if(scriptSpecified) {
+ if (scriptFile != null) {
System.err.println("Can't specify probabilities and use script.");
return -1;
}
@@ -432,7 +512,7 @@ public class LoadGenerator extends Configured implements Tool {
return -1;
}
} else if (args[i].equals("-writeProbability")) {
- if(scriptSpecified) {
+ if (scriptFile != null) {
System.err.println("Can't specify probabilities and use script.");
return -1;
}
@@ -456,14 +536,18 @@ public class LoadGenerator extends Configured implements Tool {
} else if (args[i].equals("-startTime")) {
startTime = Long.parseLong(args[++i]);
} else if (args[i].equals("-elapsedTime")) {
- if(scriptSpecified) {
+ if (scriptFile != null) {
System.err.println("Can't specify elapsedTime and use script.");
return -1;
}
durations[0] = Long.parseLong(args[++i]);
} else if (args[i].equals("-seed")) {
- r = new Random(Long.parseLong(args[++i])+hostHashCode);
- } else {
+ seed = Long.parseLong(args[++i]);
+ r = new Random(seed);
+ } else if (args[i].equals("-flagFile")) {
+ LOG.info("got flagFile:" + flagFile);
+ flagFile = new Path(args[++i]);
+ }else {
System.err.println(USAGE);
ToolRunner.printGenericCommandUsage(System.err);
return -1;
@@ -475,6 +559,12 @@ public class LoadGenerator extends Configured implements Tool {
return -1;
}
+ // Load Script File if not MR; for MR scriptFile is loaded by Mapper
+ if (!runAsMapReduce && scriptFile != null) {
+ if(loadScriptFile(scriptFile, true) == -1)
+ return -1;
+ }
+
for(int i = 0; i < readProbs.length; i++) {
if (readProbs[i] + writeProbs[i] <0 || readProbs[i]+ writeProbs[i] > 1) {
System.err.println(
@@ -483,12 +573,7 @@ public class LoadGenerator extends Configured implements Tool {
return -1;
}
}
-
- if (r==null) {
- r = new Random(Time.now()+hostHashCode);
- }
-
- return initFileDirTables();
+ return 0;
}
private static void parseScriptLine(String line, ArrayList<Long> duration,
@@ -527,9 +612,25 @@ public class LoadGenerator extends Configured implements Tool {
* @return 0 if successful, -1 if not
* @throws IOException if errors with file IO
*/
- private int loadScriptFile(String filename) throws IOException {
- FileReader fr = new FileReader(new File(filename));
- BufferedReader br = new BufferedReader(fr);
+ protected static int loadScriptFile(String filename, boolean readLocally) throws IOException {
+
+ FileContext fc;
+ if (readLocally) { // read locally - program is run without MR
+ fc = FileContext.getLocalFSFileContext();
+ } else {
+ fc = FileContext.getFileContext(); // use default file system
+ }
+ DataInputStream in = null;
+ try {
+ in = fc.open(new Path(filename));
+ } catch (IOException e) {
+ System.err.println("Unable to open scriptFile: " + filename);
+
+ System.exit(-1);
+ }
+ InputStreamReader inr = new InputStreamReader(in);
+
+ BufferedReader br = new BufferedReader(inr);
ArrayList<Long> duration = new ArrayList<Long>();
ArrayList<Double> readProb = new ArrayList<Double>();
ArrayList<Double> writeProb = new ArrayList<Double>();
@@ -619,7 +720,7 @@ public class LoadGenerator extends Configured implements Tool {
* This allows multiple instances of this program, running on clock
* synchronized nodes, to start at roughly the same time.
*/
- private void barrier() {
+ private static void barrier() {
long sleepTime;
while ((sleepTime = startTime - Time.now()) > 0) {
try {
@@ -635,9 +736,7 @@ public class LoadGenerator extends Configured implements Tool {
* @throws Exception
*/
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(),
- new LoadGenerator(), args);
+ int res = ToolRunner.run(new Configuration(), new LoadGenerator(), args);
System.exit(res);
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java
new file mode 100644
index 0000000..c47d971
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/loadGenerator/LoadGeneratorMR.java
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.loadGenerator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.UnknownHostException;
+import java.util.EnumSet;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+
+/** The load generator is a tool for testing NameNode behavior under
+ * different client loads.
+ * The main code is in HadoopCommon, @LoadGenerator. This class, LoadGeneratorMR
+ * lets you run that LoadGenerator as a MapReduce job.
+ *
+ * The synopsis of the command is
+ * java LoadGeneratorMR
+ * -mr <numMapJobs> <outputDir> : results in outputDir/Results
+ * the rest of the args are the same as the original LoadGenerator.
+ *
+ */
+public class LoadGeneratorMR extends LoadGenerator {
+ public static final Log LOG = LogFactory.getLog(LoadGenerator.class);
+ private static int numMapTasks = 1;
+ private String mrOutDir;
+
+ final private static String USAGE_CMD = "java LoadGeneratorMR\n";
+ final private static String USAGE = USAGE_CMD
+ + "-mr <numMapJobs> <outputDir> [MUST be first 3 args] \n" + USAGE_ARGS ;
+
+ // Constant "keys" used to communicate between map and reduce
+ final private static Text OPEN_EXECTIME = new Text("OpenExecutionTime");
+ final private static Text NUMOPS_OPEN = new Text("NumOpsOpen");
+ final private static Text LIST_EXECTIME = new Text("ListExecutionTime");
+ final private static Text NUMOPS_LIST = new Text("NumOpsList");
+ final private static Text DELETE_EXECTIME = new Text("DeletionExecutionTime");
+ final private static Text NUMOPS_DELETE = new Text("NumOpsDelete");
+ final private static Text CREATE_EXECTIME = new Text("CreateExecutionTime");
+ final private static Text NUMOPS_CREATE = new Text("NumOpsCreate");
+ final private static Text WRITE_CLOSE_EXECTIME = new Text("WriteCloseExecutionTime");
+ final private static Text NUMOPS_WRITE_CLOSE = new Text("NumOpsWriteClose");
+ final private static Text ELAPSED_TIME = new Text("ElapsedTime");
+ final private static Text TOTALOPS = new Text("TotalOps");
+
+ // Config keys to pass args from Main to the Job
+ final private static String LG_ROOT = "LG.root";
+ final private static String LG_SCRIPTFILE = "LG.scriptFile";
+ final private static String LG_MAXDELAYBETWEENOPS = "LG.maxDelayBetweenOps";
+ final private static String LG_NUMOFTHREADS = "LG.numOfThreads";
+ final private static String LG_READPR = "LG.readPr";
+ final private static String LG_WRITEPR = "LG.writePr";
+ final private static String LG_SEED = "LG.r";
+ final private static String LG_NUMMAPTASKS = "LG.numMapTasks";
+ final private static String LG_ELAPSEDTIME = "LG.elapsedTime";
+ final private static String LG_STARTTIME = "LG.startTime";
+ final private static String LG_FLAGFILE = "LG.flagFile";
+
+
+ /** Constructor */
+ public LoadGeneratorMR() throws IOException, UnknownHostException {
+ super();
+ }
+
+ public LoadGeneratorMR(Configuration conf) throws IOException, UnknownHostException {
+ this();
+ setConf(conf);
+ }
+
+ /** Main function called by tool runner.
+ * It first initializes data by parsing the command line arguments.
+ * It then calls the loadGenerator
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ int exitCode = parseArgsMR(args);
+ if (exitCode != 0) {
+ return exitCode;
+ }
+ System.out.println("Running LoadGeneratorMR against fileSystem: " +
+ FileContext.getFileContext().getDefaultFileSystem().getUri());
+
+ return submitAsMapReduce(); // reducer will print the results
+ }
+
+
+ /**
+ * Parse the command line arguments and initialize the data.
+ * Only parse the first arg: -mr <numMapTasks> <mrOutDir> (MUST be first three Args)
+ * The rest are parsed by the Parent LoadGenerator
+ **/
+
+ private int parseArgsMR(String[] args) throws IOException {
+ try {
+ if (args.length >= 3 && args[0].equals("-mr")) {
+ numMapTasks = Integer.parseInt(args[1]);
+ mrOutDir = args[2];
+ if (mrOutDir.startsWith("-")) {
+ System.err.println("Missing output file parameter, instead got: "
+ + mrOutDir);
+ System.err.println(USAGE);
+ return -1;
+ }
+ } else {
+ System.err.println(USAGE);
+ ToolRunner.printGenericCommandUsage(System.err);
+ return -1;
+ }
+ String[] strippedArgs = new String[args.length - 3];
+ for (int i = 0; i < strippedArgs.length; i++) {
+ strippedArgs[i] = args[i + 3];
+ }
+ super.parseArgs(true, strippedArgs); // Parse normal LoadGenerator args
+ } catch (NumberFormatException e) {
+ System.err.println("Illegal parameter: " + e.getLocalizedMessage());
+ System.err.println(USAGE);
+ return -1;
+ }
+ return 0;
+ }
+
+ /** Main program
+ *
+ * @param args command line arguments
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new LoadGeneratorMR(), args);
+ System.exit(res);
+ }
+
+
+ // The following methods are only used when LoadGenerator is run a MR job
+ /**
+ * Based on args we submit the LoadGenerator as MR job.
+ * Number of MapTasks is numMapTasks
+ * @return exitCode for job submission
+ */
+ private int submitAsMapReduce() {
+
+ System.out.println("Running as a MapReduce job with " +
+ numMapTasks + " mapTasks; Output to file " + mrOutDir);
+
+
+ Configuration conf = new Configuration(getConf());
+
+ // First set all the args of LoadGenerator as Conf vars to pass to MR tasks
+
+ conf.set(LG_ROOT , root.toString());
+ conf.setInt(LG_MAXDELAYBETWEENOPS, maxDelayBetweenOps);
+ conf.setInt(LG_NUMOFTHREADS, numOfThreads);
+ conf.set(LG_READPR, readProbs[0]+""); //Pass Double as string
+ conf.set(LG_WRITEPR, writeProbs[0]+""); //Pass Double as string
+ conf.setLong(LG_SEED, seed); //No idea what this is
+ conf.setInt(LG_NUMMAPTASKS, numMapTasks);
+ if (scriptFile == null && durations[0] <=0) {
+ System.err.println("When run as a MapReduce job, elapsed Time or ScriptFile must be specified");
+ System.exit(-1);
+ }
+ conf.setLong(LG_ELAPSEDTIME, durations[0]);
+ conf.setLong(LG_STARTTIME, startTime);
+ if (scriptFile != null) {
+ conf.set(LG_SCRIPTFILE , scriptFile);
+ }
+ conf.set(LG_FLAGFILE, flagFile.toString());
+
+ // Now set the necessary conf variables that apply to run MR itself.
+ JobConf jobConf = new JobConf(conf, LoadGenerator.class);
+ jobConf.setJobName("NNLoadGeneratorViaMR");
+ jobConf.setNumMapTasks(numMapTasks);
+ jobConf.setNumReduceTasks(1); // 1 reducer to collect the results
+
+ jobConf.setOutputKeyClass(Text.class);
+ jobConf.setOutputValueClass(IntWritable.class);
+
+ jobConf.setMapperClass(MapperThatRunsNNLoadGenerator.class);
+ jobConf.setReducerClass(ReducerThatCollectsLGdata.class);
+
+ jobConf.setInputFormat(DummyInputFormat.class);
+ jobConf.setOutputFormat(TextOutputFormat.class);
+
+ // Explicitly set number of max map attempts to 1.
+ jobConf.setMaxMapAttempts(1);
+ // Explicitly turn off speculative execution
+ jobConf.setSpeculativeExecution(false);
+
+ // This mapReduce job has no input but has output
+ FileOutputFormat.setOutputPath(jobConf, new Path(mrOutDir));
+
+ try {
+ JobClient.runJob(jobConf);
+ } catch (IOException e) {
+ System.err.println("Failed to run job: " + e.getMessage());
+ return -1;
+ }
+ return 0;
+
+ }
+
+
+ // Each split is empty
+ public static class EmptySplit implements InputSplit {
+ public void write(DataOutput out) throws IOException {}
+ public void readFields(DataInput in) throws IOException {}
+ public long getLength() {return 0L;}
+ public String[] getLocations() {return new String[0];}
+ }
+
+ // Dummy Input format to send 1 record - number of spits is numMapTasks
+ public static class DummyInputFormat extends Configured implements
+ InputFormat<LongWritable, Text> {
+
+ public InputSplit[] getSplits(JobConf conf, int numSplits) {
+ numSplits = conf.getInt("LG.numMapTasks", 1);
+ InputSplit[] ret = new InputSplit[numSplits];
+ for (int i = 0; i < numSplits; ++i) {
+ ret[i] = new EmptySplit();
+ }
+ return ret;
+ }
+
+ public RecordReader<LongWritable, Text> getRecordReader(
+ InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {
+
+ return new RecordReader<LongWritable, Text>() {
+
+ boolean sentOneRecord = false;
+
+ public boolean next(LongWritable key, Text value)
+ throws IOException {
+ key.set(1);
+ value.set("dummy");
+ if (sentOneRecord == false) { // first call
+ sentOneRecord = true;
+ return true;
+ }
+ return false; // we have sent one record - we are done
+ }
+
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+ public Text createValue() {
+ return new Text();
+ }
+ public long getPos() throws IOException {
+ return 1;
+ }
+ public void close() throws IOException {
+ }
+ public float getProgress() throws IOException {
+ return 1;
+ }
+ };
+ }
+ }
+
+ public static class MapperThatRunsNNLoadGenerator extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+ private JobConf jobConf;
+
+ @Override
+ public void configure(JobConf job) {
+ this.jobConf = job;
+ getArgsFromConfiguration(jobConf);
+ }
+
+ private class ProgressThread extends Thread {
+
+ boolean keepGoing; // while this is true, thread runs.
+ private Reporter reporter;
+
+ public ProgressThread(final Reporter r) {
+ this.reporter = r;
+ this.keepGoing = true;
+ }
+
+ public void run() {
+ while (keepGoing) {
+ if (!ProgressThread.interrupted()) {
+ try {
+ sleep(30 * 1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ reporter.progress();
+ }
+ }
+ }
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, IntWritable> output, Reporter reporter)
+ throws IOException {
+ ProgressThread progressThread = new ProgressThread(reporter);
+ progressThread.start();
+ try {
+ new LoadGenerator(jobConf).generateLoadOnNN();
+ System.out
+ .println("Finished generating load on NN, sending results to the reducer");
+ printResults(System.out);
+ progressThread.keepGoing = false;
+ progressThread.join();
+
+ // Send results to Reducer
+ output.collect(OPEN_EXECTIME,
+ new IntWritable((int) executionTime[OPEN]));
+ output.collect(NUMOPS_OPEN, new IntWritable((int) numOfOps[OPEN]));
+
+ output.collect(LIST_EXECTIME,
+ new IntWritable((int) executionTime[LIST]));
+ output.collect(NUMOPS_LIST, new IntWritable((int) numOfOps[LIST]));
+
+ output.collect(DELETE_EXECTIME, new IntWritable(
+ (int) executionTime[DELETE]));
+ output.collect(NUMOPS_DELETE, new IntWritable((int) numOfOps[DELETE]));
+
+ output.collect(CREATE_EXECTIME, new IntWritable(
+ (int) executionTime[CREATE]));
+ output.collect(NUMOPS_CREATE, new IntWritable((int) numOfOps[CREATE]));
+
+ output.collect(WRITE_CLOSE_EXECTIME, new IntWritable(
+ (int) executionTime[WRITE_CLOSE]));
+ output.collect(NUMOPS_WRITE_CLOSE, new IntWritable(
+ (int) numOfOps[WRITE_CLOSE]));
+
+ output.collect(TOTALOPS, new IntWritable((int) totalOps));
+ output.collect(ELAPSED_TIME, new IntWritable((int) totalTime));
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void getArgsFromConfiguration(Configuration conf) {
+
+ maxDelayBetweenOps = conf.getInt(LG_MAXDELAYBETWEENOPS,
+ maxDelayBetweenOps);
+ numOfThreads = conf.getInt(LG_NUMOFTHREADS, numOfThreads);
+ readProbs[0] = Double.parseDouble(conf.get(LG_READPR, readProbs[0] + ""));
+ writeProbs[0] = Double.parseDouble(conf.get(LG_WRITEPR, writeProbs[0]
+ + ""));
+ seed = conf.getLong(LG_SEED, seed);
+ numMapTasks = conf.getInt(LG_NUMMAPTASKS, numMapTasks);
+ root = new Path(conf.get(LG_ROOT, root.toString()));
+ durations[0] = conf.getLong(LG_ELAPSEDTIME, 0);
+ startTime = conf.getLong(LG_STARTTIME, 0);
+ scriptFile = conf.get(LG_SCRIPTFILE, null);
+ flagFile = new Path(conf.get(LG_FLAGFILE, FLAGFILE_DEFAULT));
+ if (durations[0] > 0 && scriptFile != null) {
+ System.err.println("Cannot specify both ElapsedTime and ScriptFile, exiting");
+ System.exit(-1);
+ }
+
+ try {
+ if (scriptFile != null && loadScriptFile(scriptFile, false) < 0) {
+ System.err.println("Error in scriptFile, exiting");
+ System.exit(-1);
+ }
+ } catch (IOException e) {
+ System.err.println("Error loading script file " + scriptFile);
+ e.printStackTrace();
+ }
+ if (durations[0] <= 0) {
+ System.err.println("A duration of zero or less is not allowed when running via MapReduce.");
+ System.exit(-1);
+ }
+ }
+ }
+
+ public static class ReducerThatCollectsLGdata extends MapReduceBase implements
+ Reducer<Text, IntWritable, Text, IntWritable> {
+ private IntWritable result = new IntWritable();
+ private JobConf jobConf;
+
+ @Override
+ public void configure(JobConf job) {
+ this.jobConf = job;
+ }
+
+ @Override
+ public void reduce(Text key, Iterator<IntWritable> values,
+ OutputCollector<Text, IntWritable> output, Reporter reporter)
+ throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ if (key.equals(OPEN_EXECTIME)){
+ executionTime[OPEN] = sum;
+ } else if (key.equals(NUMOPS_OPEN)){
+ numOfOps[OPEN] = sum;
+ } else if (key.equals(LIST_EXECTIME)){
+ executionTime[LIST] = sum;
+ } else if (key.equals(NUMOPS_LIST)){
+ numOfOps[LIST] = sum;
+ } else if (key.equals(DELETE_EXECTIME)){
+ executionTime[DELETE] = sum;
+ } else if (key.equals(NUMOPS_DELETE)){
+ numOfOps[DELETE] = sum;
+ } else if (key.equals(CREATE_EXECTIME)){
+ executionTime[CREATE] = sum;
+ } else if (key.equals(NUMOPS_CREATE)){
+ numOfOps[CREATE] = sum;
+ } else if (key.equals(WRITE_CLOSE_EXECTIME)){
+ System.out.println(WRITE_CLOSE_EXECTIME + " = " + sum);
+ executionTime[WRITE_CLOSE]= sum;
+ } else if (key.equals(NUMOPS_WRITE_CLOSE)){
+ numOfOps[WRITE_CLOSE] = sum;
+ } else if (key.equals(TOTALOPS)){
+ totalOps = sum;
+ } else if (key.equals(ELAPSED_TIME)){
+ totalTime = sum;
+ }
+ result.set(sum);
+ output.collect(key, result);
+ // System.out.println("Key = " + key + " Sum is =" + sum);
+ // printResults(System.out);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Output the result to a file Results in the output dir
+ FileContext fc;
+ try {
+ fc = FileContext.getFileContext(jobConf);
+ } catch (IOException ioe) {
+ System.err.println("Can not initialize the file system: " +
+ ioe.getLocalizedMessage());
+ return;
+ }
+ FSDataOutputStream o = fc.create(FileOutputFormat.getTaskOutputPath(jobConf, "Results"),
+ EnumSet.of(CreateFlag.CREATE));
+
+ PrintStream out = new PrintStream(o);
+ printResults(out);
+ out.close();
+ o.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63947cce/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
index f2cd53c..b1dfe56 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/test/MapredTestDriver.java
@@ -41,6 +41,10 @@ import org.apache.hadoop.fs.DFSCIOTest;
import org.apache.hadoop.fs.DistributedFSCheck;
import org.apache.hadoop.io.FileBench;
import org.apache.hadoop.fs.JHLogAnalyzer;
+import org.apache.hadoop.fs.loadGenerator.DataGenerator;
+import org.apache.hadoop.fs.loadGenerator.LoadGenerator;
+import org.apache.hadoop.fs.loadGenerator.LoadGeneratorMR;
+import org.apache.hadoop.fs.loadGenerator.StructureGenerator;
import org.apache.hadoop.fs.slive.SliveTest;
/**
@@ -107,6 +111,14 @@ public class MapredTestDriver {
"Single process HDFS and MR cluster.");
pgd.addClass("largesorter", LargeSorter.class,
"Large-Sort tester");
+ pgd.addClass("NNloadGenerator", LoadGenerator.class,
+ "Generate load on Namenode using NN loadgenerator run WITHOUT MR");
+ pgd.addClass("NNloadGeneratorMR", LoadGeneratorMR.class,
+ "Generate load on Namenode using NN loadgenerator run as MR job");
+ pgd.addClass("NNstructureGenerator", StructureGenerator.class,
+ "Generate the structure to be used by NNdataGenerator");
+ pgd.addClass("NNdataGenerator", DataGenerator.class,
+ "Generate the data to be used by NNloadGenerator");
} catch(Throwable e) {
e.printStackTrace();
}