You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2007/12/10 23:43:45 UTC
svn commit: r603084 - in /lucene/hadoop/trunk: CHANGES.txt
src/test/org/apache/hadoop/dfs/NNBench.java
Author: dhruba
Date: Mon Dec 10 14:43:44 2007
New Revision: 603084
URL: http://svn.apache.org/viewvc?rev=603084&view=rev
Log:
HADOOP-2000. Rewrite NNBench to measure namenode performance accurately.
It now uses the map-reduce framework for load generation.
(Mukund Madhugiri via dhruba)
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=603084&r1=603083&r2=603084&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Dec 10 14:43:44 2007
@@ -109,6 +109,10 @@
HADOOP-1327. Include website documentation for streaming. (Rob Weltman
via omalley)
+ HADOOP-2000. Rewrite NNBench to measure namenode performance accurately.
+ It now uses the map-reduce framework for load generation.
+ (Mukund Madhugiri via dhruba)
+
OPTIMIZATIONS
HADOOP-1898. Release the lock protecting the last time of the last stack
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java?rev=603084&r1=603083&r2=603084&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java Mon Dec 10 14:43:44 2007
@@ -20,322 +20,938 @@
import java.io.IOException;
import java.util.Date;
+import java.io.DataInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.File;
+import java.io.BufferedReader;
+import java.util.StringTokenizer;
+import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
-import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile;
+
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.TaskTracker;
/**
* This program executes a specified operation that applies load to
- * the NameNode. Possible operations include create/writing files,
- * opening/reading files, renaming files, and deleting files.
+ * the NameNode.
*
* When run simultaneously on multiple nodes, this program functions
* as a stress-test and benchmark for namenode, especially when
* the number of bytes written to each file is small.
+ *
+ * Valid operations are:
+ * create_write
+ * open_read
+ * rename
+ * delete
+ *
+ * NOTE: The open_read, rename and delete operations assume that the files
+ * they operate on are already available. The create_write operation
+ * must be run before running the other operations.
*/
+
public class NNBench {
+ protected static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.dfs.NNBench");
+
+ protected static String CONTROL_DIR_NAME = "control";
+ protected static String OUTPUT_DIR_NAME = "output";
+ protected static String DATA_DIR_NAME = "data";
+ protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log";
+ protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4";
+
+ public static String operation = "none";
+ public static long numberOfMaps = 1l; // default is 1
+ public static long numberOfReduces = 1l; // default is 1
+ public static long startTime =
+ System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min
+ public static long blockSize = 1l; // default is 1
+ public static int bytesToWrite = 0; // default is 0
+ public static long bytesPerChecksum = 1l; // default is 1
+ public static long numberOfFiles = 1l; // default is 1
+ public static short replicationFactorPerFile = 1; // default is 1
+ public static String baseDir = "/benchmarks/NNBench"; // default
+ public static boolean readFileAfterOpen = false; // default is to not read
+
+ // Supported operations
+ private static final String OP_CREATE_WRITE = "create_write";
+ private static final String OP_OPEN_READ = "open_read";
+ private static final String OP_RENAME = "rename";
+ private static final String OP_DELETE = "delete";
+
+ // To display in the format that matches the NN and DN log format
+ // Example: 2007-10-26 00:01:19,853
+ static SimpleDateFormat sdf =
+ new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S");
+
+ private static Configuration config = new Configuration();
- private static final Log LOG = LogFactory.getLog(
- "org.apache.hadoop.dfs.NNBench");
+ /**
+ * Clean up the files before a test run
+ *
+ * @throws IOException on error
+ */
+ private static void cleanupBeforeTestrun() throws IOException {
+ FileSystem tempFS = FileSystem.get(config);
+
+ // Delete the data directory only if it is the create/write operation
+ if (operation.equals(OP_CREATE_WRITE)) {
+ LOG.info("Deleting data directory");
+ tempFS.delete(new Path(baseDir, DATA_DIR_NAME));
+ }
+ tempFS.delete(new Path(baseDir, CONTROL_DIR_NAME));
+ tempFS.delete(new Path(baseDir, OUTPUT_DIR_NAME));
+ }
- // variable initialzed from command line arguments
- private static long startTime = 0;
- private static int numFiles = 0;
- private static long bytesPerBlock = 1;
- private static long blocksPerFile = 0;
- private static long bytesPerFile = 1;
- private static Path baseDir = null;
-
- // variables initialized in main()
- private static FileSystem fileSys = null;
- private static Path taskDir = null;
- private static String uniqueId = null;
- private static byte[] buffer;
- private static long maxExceptionsPerFile = 200;
-
- /**
- * Returns when the current number of seconds from the epoch equals
- * the command line argument given by <code>-startTime</code>.
- * This allows multiple instances of this program, running on clock
- * synchronized nodes, to start at roughly the same time.
- */
- static void barrier() {
- long sleepTime;
- while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
+ /**
+ * Create control files before a test run.
+ * Number of files created is equal to the number of maps specified
+ *
+ * @throws IOException on error
+ */
+ private static void createControlFiles() throws IOException {
+ FileSystem tempFS = FileSystem.get(config);
+ LOG.info("Creating " + numberOfMaps + " control files");
+
+ for (int i = 0; i < numberOfMaps; i++) {
+ String strFileName = "NNBench_Controlfile_" + i;
+ Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
+ strFileName);
+
+ SequenceFile.Writer writer = null;
try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException ex) {
+ writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
+ LongWritable.class, CompressionType.NONE);
+ writer.append(new Text(strFileName), new LongWritable(0l));
+ } catch(Exception e) {
+ throw new IOException(e.getLocalizedMessage());
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ writer = null;
}
}
}
+ /**
+ * Display version
+ */
+ private static void displayVersion() {
+ System.out.println(NNBENCH_VERSION);
+ }
+
+ /**
+ * Display usage
+ */
+ private static void displayUsage() {
+ String usage =
+ "Usage: nnbench <options>\n" +
+ "Options:\n" +
+ "\t-operation <Available operations are " + OP_CREATE_WRITE + " " +
+ OP_OPEN_READ + " " + OP_RENAME + " " + OP_DELETE + ". " +
+ "This option is mandatory>\n" +
+ "\t * NOTE: The open_read, rename and delete operations assume " +
+ "that the files they operate on, are already available. " +
+ "The create_write operation must be run before running the " +
+ "other operations.\n" +
+ "\t-maps <number of maps. default is 1. This is not mandatory>\n" +
+ "\t-reduces <number of reduces. default is 1. This is not mandatory>\n" +
+ "\t-startTime <time to start, given in seconds from the epoch. " +
+ "Make sure this is far enough into the future, so all maps " +
+ "(operations) will start at the same time>. " +
+ "default is launch time + 2 mins. This is not mandatory \n" +
+ "\t-blockSize <Block size in bytes. default is 1. " +
+ "This is not mandatory>\n" +
+ "\t-bytesToWrite <Bytes to write. default is 0. " +
+ "This is not mandatory>\n" +
+ "\t-bytesPerChecksum <Bytes per checksum for the files. default is 1. " +
+ "This is not mandatory>\n" +
+ "\t-numberOfFiles <number of files to create. default is 1. " +
+ "This is not mandatory>\n" +
+ "\t-replicationFactorPerFile <Replication factor for the files." +
+ " default is 1. This is not mandatory>\n" +
+ "\t-baseDir <base DFS path. default is /becnhmarks/NNBench. " +
+ "This is not mandatory>\n" +
+ "\t-readFileAfterOpen <true or false. if true, it reads the file and " +
+ "reports the average time to read. This is valid with the open_read " +
+ "operation. default is false. This is not mandatory>\n" +
+ "\t-help: Display the help statement\n";
+
- static private void handleException(String operation, Throwable e,
- int singleFileExceptions) {
- LOG.warn("Exception while " + operation + ": " +
- StringUtils.stringifyException(e));
- if (singleFileExceptions >= maxExceptionsPerFile) {
- throw new RuntimeException(singleFileExceptions +
- " exceptions for a single file exceeds threshold. Aborting");
+ System.out.println(usage);
+ }
+
+ /**
+ * check for arguments and fail if the values are not specified
+ */
+ public static void checkArgs(final int index, final int length) {
+ if (index == length) {
+ displayUsage();
+ System.exit(-1);
}
}
/**
- * Create and write to a given number of files. Repeat each remote
- * operation until is suceeds (does not throw an exception).
- *
- * @return the number of exceptions caught
+ * Parse input arguments
+ *
+ * @params args Command line inputs
*/
- static int createWrite() {
- int totalExceptions = 0;
- FSDataOutputStream out = null;
- boolean success = false;
- for (int index = 0; index < numFiles; index++) {
- int singleFileExceptions = 0;
- do { // create file until is succeeds or max exceptions reached
- try {
- out = fileSys.create(
- new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock);
- success = true;
- } catch (IOException ioe) {
- success=false;
- totalExceptions++;
- handleException("creating file #" + index, ioe, ++singleFileExceptions);
- }
- } while (!success);
- long toBeWritten = bytesPerFile;
- while (toBeWritten > 0) {
- int nbytes = (int) Math.min(buffer.length, toBeWritten);
- toBeWritten -= nbytes;
- try { // only try once
- out.write(buffer, 0, nbytes);
- } catch (IOException ioe) {
- totalExceptions++;
- handleException("writing to file #" + index, ioe, ++singleFileExceptions);
- }
+ public static void parseInputs(final String[] args) {
+ // If there are no command line arguments, exit
+ if (args.length == 0) {
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // Parse command line args
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-operation")) {
+ operation = args[++i];
+ } else if (args[i].equals("-maps")) {
+ checkArgs(i + 1, args.length);
+ numberOfMaps = Long.parseLong(args[++i]);
+ } else if (args[i].equals("-reduces")) {
+ checkArgs(i + 1, args.length);
+ numberOfReduces = Long.parseLong(args[++i]);
+ } else if (args[i].equals("-startTime")) {
+ checkArgs(i + 1, args.length);
+ startTime = Long.parseLong(args[++i]) * 1000;
+ } else if (args[i].equals("-blockSize")) {
+ checkArgs(i + 1, args.length);
+ blockSize = Long.parseLong(args[++i]);
+ } else if (args[i].equals("-bytesToWrite")) {
+ checkArgs(i + 1, args.length);
+ bytesToWrite = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-bytesPerChecksum")) {
+ checkArgs(i + 1, args.length);
+ bytesPerChecksum = Long.parseLong(args[++i]);
+ } else if (args[i].equals("-numberOfFiles")) {
+ checkArgs(i + 1, args.length);
+ numberOfFiles = Long.parseLong(args[++i]);
+ } else if (args[i].equals("-replicationFactorPerFile")) {
+ checkArgs(i + 1, args.length);
+ replicationFactorPerFile = Short.parseShort(args[++i]);
+ } else if (args[i].equals("-baseDir")) {
+ checkArgs(i + 1, args.length);
+ baseDir = args[++i];
+ } else if (args[i].equals("-readFileAfterOpen")) {
+ checkArgs(i + 1, args.length);
+ readFileAfterOpen = Boolean.parseBoolean(args[++i]);
+ } else if (args[i].equals("-help")) {
+ displayUsage();
+ System.exit(-1);
}
- do { // close file until is succeeds
- try {
- out.close();
- success = true;
- } catch (IOException ioe) {
- success=false;
- totalExceptions++;
- handleException("closing file #" + index, ioe, ++singleFileExceptions);
- }
- } while (!success);
}
- return totalExceptions;
- }
+ LOG.info("Test Inputs: ");
+ LOG.info(" Test Operation: " + operation);
+ LOG.info(" Start time: " + sdf.format(new Date(startTime)));
+ LOG.info(" Number of maps: " + numberOfMaps);
+ LOG.info(" Number of reduces: " + numberOfReduces);
+ LOG.info(" Block Size: " + blockSize);
+ LOG.info(" Bytes to write: " + bytesToWrite);
+ LOG.info(" Bytes per checksum: " + bytesPerChecksum);
+ LOG.info(" Number of files: " + numberOfFiles);
+ LOG.info(" Replication factor: " + replicationFactorPerFile);
+ LOG.info(" Base dir: " + baseDir);
+ LOG.info(" Read file after open: " + readFileAfterOpen);
+
+ // Set user-defined parameters, so the map method can access the values
+ config.set("test.nnbench.operation", operation);
+ config.setLong("test.nnbench.maps", numberOfMaps);
+ config.setLong("test.nnbench.reduces", numberOfReduces);
+ config.setLong("test.nnbench.starttime", startTime);
+ config.setLong("test.nnbench.blocksize", blockSize);
+ config.setInt("test.nnbench.bytestowrite", bytesToWrite);
+ config.setLong("test.nnbench.bytesperchecksum", bytesPerChecksum);
+ config.setLong("test.nnbench.numberoffiles", numberOfFiles);
+ config.setInt("test.nnbench.replicationfactor",
+ (int) replicationFactorPerFile);
+ config.set("test.nnbench.basedir", baseDir);
+ config.setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen);
+
+ config.set("test.nnbench.datadir.name", DATA_DIR_NAME);
+ config.set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME);
+ config.set("test.nnbench.controldir.name", CONTROL_DIR_NAME);
+ }
+
/**
- * Open and read a given number of files.
- *
- * @return the number of exceptions caught
+ * Analyze the results
+ *
+ * @throws IOException on error
*/
- static int openRead() {
- int totalExceptions = 0;
- FSDataInputStream in = null;
- for (int index = 0; index < numFiles; index++) {
- int singleFileExceptions = 0;
- try {
- in = fileSys.open(new Path(taskDir, "" + index), 512);
- long toBeRead = bytesPerFile;
- while (toBeRead > 0) {
- int nbytes = (int) Math.min(buffer.length, toBeRead);
- toBeRead -= nbytes;
- try { // only try once
- in.read(buffer, 0, nbytes);
- } catch (IOException ioe) {
- totalExceptions++;
- handleException("reading from file #" + index, ioe, ++singleFileExceptions);
- }
- }
- in.close();
- } catch (IOException ioe) {
- totalExceptions++;
- handleException("opening file #" + index, ioe, ++singleFileExceptions);
+ private static void analyzeResults() throws IOException {
+ final FileSystem fs = FileSystem.get(config);
+ Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
+ "part-00000");
+
+ DataInputStream in;
+ in = new DataInputStream(fs.open(reduceFile));
+
+ BufferedReader lines;
+ lines = new BufferedReader(new InputStreamReader(in));
+
+ long totalTimeAL1 = 0l;
+ long totalTimeAL2 = 0l;
+ long totalTimeTPmS = 0l;
+ long lateMaps = 0l;
+ long numOfExceptions = 0l;
+ long successfulFileOps = 0l;
+
+ long mapStartTimeTPmS = 0l;
+ long mapEndTimeTPmS = 0l;
+
+ String resultTPSLine1 = null;
+ String resultTPSLine2 = null;
+ String resultALLine1 = null;
+ String resultALLine2 = null;
+
+ String line;
+ while((line = lines.readLine()) != null) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;");
+ String attr = tokens.nextToken();
+ if (attr.endsWith(":totalTimeAL1")) {
+ totalTimeAL1 = Long.parseLong(tokens.nextToken());
+ } else if (attr.endsWith(":totalTimeAL2")) {
+ totalTimeAL2 = Long.parseLong(tokens.nextToken());
+ } else if (attr.endsWith(":totalTimeTPmS")) {
+ totalTimeTPmS = Long.parseLong(tokens.nextToken());
+ } else if (attr.endsWith(":latemaps")) {
+ lateMaps = Long.parseLong(tokens.nextToken());
+ } else if (attr.endsWith(":numOfExceptions")) {
+ numOfExceptions = Long.parseLong(tokens.nextToken());
+ } else if (attr.endsWith(":successfulFileOps")) {
+ successfulFileOps = Long.parseLong(tokens.nextToken());
+ } else if (attr.endsWith(":mapStartTimeTPmS")) {
+ mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
+ } else if (attr.endsWith(":mapEndTimeTPmS")) {
+ mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
}
}
- return totalExceptions;
- }
+ // Average latency is the average time to perform 'n' number of
+ // operations, n being the number of files
+ double avgLatency1 = (double) totalTimeAL1 / (double) successfulFileOps;
+ double avgLatency2 = (double) totalTimeAL2 / (double) successfulFileOps;
+
+ // The time it takes for the longest running map is measured. Using that,
+ // cluster transactions per second is calculated. It includes time to
+ // retry any of the failed operations
+ double longestMapTimeTPmS = (double) (mapEndTimeTPmS - mapStartTimeTPmS);
+ double totalTimeTPS = (longestMapTimeTPmS == 0) ?
+ (1000 * successfulFileOps) :
+ (double) (1000 * successfulFileOps) / (double) longestMapTimeTPmS;
+
+ // The time it takes to perform 'n' operations is calculated (in ms),
+ // n being the number of files. Using that time, the average execution
+ // time is calculated. It includes time to retry any of the
+ // failed operations
+ double AverageExecutionTime = (totalTimeTPmS == 0) ?
+ (double) successfulFileOps :
+ (double) (totalTimeTPmS / successfulFileOps);
+
+ if (operation.equals(OP_CREATE_WRITE)) {
+ // For create/write/close, it is treated as two transactions,
+ // since a file create from a client perspective involves create and close
+ resultTPSLine1 = " TPS: Create/Write/Close: " +
+ (int) (totalTimeTPS * 2);
+ resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " +
+ (double) AverageExecutionTime;
+ resultALLine1 = " Avg Lat (ms): Create/Write: " + avgLatency1;
+ resultALLine2 = " Avg Lat (ms): Close: " + avgLatency2;
+ } else if (operation.equals(OP_OPEN_READ)) {
+ resultTPSLine1 = " TPS: Open/Read: " +
+ (int) totalTimeTPS;
+ resultTPSLine2 = " Avg Exec time (ms): Open/Read: " +
+ (double) AverageExecutionTime;
+ resultALLine1 = " Avg Lat (ms): Open: " + avgLatency1;
+ if (readFileAfterOpen) {
+ resultALLine2 = " Avg Lat (ms): Read: " + avgLatency2;
+ }
+ } else if (operation.equals(OP_RENAME)) {
+ resultTPSLine1 = " TPS: Rename: " +
+ (int) totalTimeTPS;
+ resultTPSLine2 = " Avg Exec time (ms): Rename: " +
+ (double) AverageExecutionTime;
+ resultALLine1 = " Avg Lat (ms): Rename: " + avgLatency1;
+ } else if (operation.equals(OP_DELETE)) {
+ resultTPSLine1 = " TPS: Delete: " +
+ (int) totalTimeTPS;
+ resultTPSLine2 = " Avg Exec time (ms): Delete: " +
+ (double) AverageExecutionTime;
+ resultALLine1 = " Avg Lat (ms): Delete: " + avgLatency1;
+ }
+
+ String resultLines[] = {
+ "-------------- NNBench -------------- : ",
+ " Version: " + NNBENCH_VERSION,
+ " Date & time: " + sdf.format(new Date(
+ System.currentTimeMillis())),
+ "",
+ " Test Operation: " + operation,
+ " Start time: " +
+ sdf.format(new Date(startTime)),
+ " Maps to run: " + numberOfMaps,
+ " Reduces to run: " + numberOfReduces,
+ " Block Size (bytes): " + blockSize,
+ " Bytes to write: " + bytesToWrite,
+ " Bytes per checksum: " + bytesPerChecksum,
+ " Number of files: " + numberOfFiles,
+ " Replication factor: " + replicationFactorPerFile,
+ " Successful file operations: " + successfulFileOps,
+ "",
+ " # maps that missed the barrier: " + lateMaps,
+ " # exceptions: " + numOfExceptions,
+ "",
+ resultTPSLine1,
+ resultTPSLine2,
+ resultALLine1,
+ resultALLine2,
+ "",
+ " RAW DATA: AL Total #1: " + totalTimeAL1,
+ " RAW DATA: AL Total #2: " + totalTimeAL2,
+ " RAW DATA: TPS Total (ms): " + totalTimeTPmS,
+ " RAW DATA: Longest Map Time (ms): " + longestMapTimeTPmS,
+ " RAW DATA: Late maps: " + lateMaps,
+ " RAW DATA: # of exceptions: " + numOfExceptions,
+ "" };
+
+ PrintStream res = new PrintStream(new FileOutputStream(
+ new File(DEFAULT_RES_FILE_NAME), true));
+
+ // Write to a file and also dump to log
+ for(int i = 0; i < resultLines.length; i++) {
+ LOG.info(resultLines[i]);
+ res.println(resultLines[i]);
+ }
+ }
+
/**
- * Rename a given number of files. Repeat each remote
- * operation until is suceeds (does not throw an exception).
- *
- * @return the number of exceptions caught
+ * Run the test
+ *
+ * @throws IOException on error
*/
- static int rename() {
- int totalExceptions = 0;
- boolean success = false;
- for (int index = 0; index < numFiles; index++) {
- int singleFileExceptions = 0;
- do { // rename file until is succeeds
- try {
- boolean result = fileSys.rename(
- new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
- success = true;
- } catch (IOException ioe) {
- success=false;
- totalExceptions++;
- handleException("creating file #" + index, ioe, ++singleFileExceptions);
- }
- } while (!success);
- }
- return totalExceptions;
- }
+ public static void runTests() throws IOException {
+ config.setLong("io.bytes.per.checksum", bytesPerChecksum);
+
+ JobConf job = new JobConf(config, NNBench.class);
+
+ job.setJobName("NNBench-" + operation);
+ job.setInputPath(new Path(baseDir, CONTROL_DIR_NAME));
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ // Explicitly set number of max map attempts to 1.
+ job.setMaxMapAttempts(1);
+ // Explicitly turn off speculative execution
+ job.setSpeculativeExecution(false);
+
+ job.setMapperClass(NNBenchMapper.class);
+ job.setReducerClass(NNBenchReducer.class);
+
+ job.setOutputPath(new Path(baseDir, OUTPUT_DIR_NAME));
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks((int) numberOfReduces);
+ JobClient.runJob(job);
+ }
+
/**
- * Delete a given number of files. Repeat each remote
- * operation until is suceeds (does not throw an exception).
- *
- * @return the number of exceptions caught
- */
- static int delete() {
- int totalExceptions = 0;
- boolean success = false;
- for (int index = 0; index < numFiles; index++) {
- int singleFileExceptions = 0;
- do { // delete file until is succeeds
- try {
- boolean result = fileSys.delete(new Path(taskDir, "A" + index));
- success = true;
- } catch (IOException ioe) {
- success=false;
- totalExceptions++;
- handleException("creating file #" + index, ioe, ++singleFileExceptions);
- }
- } while (!success);
+ * Validate the inputs
+ */
+ public static void validateInputs() {
+ // If it is not one of the four operations, then fail
+ if (!operation.equals(OP_CREATE_WRITE) &&
+ !operation.equals(OP_OPEN_READ) &&
+ !operation.equals(OP_RENAME) &&
+ !operation.equals(OP_DELETE)) {
+ System.err.println("Error: Unknown operation: " + operation);
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If number of maps is a negative number, then fail
+ // Hadoop allows the number of maps to be 0
+ if (numberOfMaps < 0) {
+ System.err.println("Error: Number of maps must be a positive number");
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If number of reduces is a negative number or 0, then fail
+ if (numberOfReduces <= 0) {
+ System.err.println("Error: Number of reduces must be a positive number");
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If blocksize is a negative number or 0, then fail
+ if (blockSize <= 0) {
+ System.err.println("Error: Block size must be a positive number");
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If bytes to write is a negative number, then fail
+ if (bytesToWrite < 0) {
+ System.err.println("Error: Bytes to write must be a positive number");
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If bytes per checksum is a negative number, then fail
+ if (bytesPerChecksum < 0) {
+ System.err.println("Error: Bytes per checksum must be a positive number");
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If number of files is a negative number, then fail
+ if (numberOfFiles < 0) {
+ System.err.println("Error: Number of files must be a positive number");
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If replication factor is a negative number, then fail
+ if (replicationFactorPerFile < 0) {
+ System.err.println("Error: Replication factor must be a positive number");
+ displayUsage();
+ System.exit(-1);
+ }
+
+ // If block size is not a multiple of bytesperchecksum, fail
+ if (blockSize % bytesPerChecksum != 0) {
+ System.err.println("Error: Block Size in bytes must be a multiple of " +
+ "bytes per checksum: ");
+ displayUsage();
+ System.exit(-1);
}
- return totalExceptions;
}
+ /**
+ * Main method for running the NNBench benchmarks
+ *
+ * @throws IOException indicates a problem with test startup
+ */
+ public static void main(String[] args) throws IOException {
+ // Display the application version string
+ displayVersion();
+
+ // Parse the inputs
+ parseInputs(args);
+
+ // Validate inputs
+ validateInputs();
+
+ // Clean up files before the test run
+ cleanupBeforeTestrun();
+
+ // Create control files before test run
+ createControlFiles();
+
+ // Run the tests as a map reduce job
+ runTests();
+ // Analyze results
+ analyzeResults();
+ }
+
+
/**
- * This launches a given namenode operation (<code>-operation</code>),
- * starting at a given time (<code>-startTime</code>). The files used
- * by the openRead, rename, and delete operations are the same files
- * created by the createWrite operation. Typically, the program
- * would be run four times, once for each operation in this order:
- * createWrite, openRead, rename, delete.
- *
- * <pre>
- * Usage: nnbench
- * -operation <one of createWrite, openRead, rename, or delete>
- * -baseDir <base output/input DFS path>
- * -startTime <time to start, given in seconds from the epoch>
- * -numFiles <number of files to create, read, rename, or delete>
- * -blocksPerFile <number of blocks to create per file>
- * [-bytesPerBlock <number of bytes to write to each block, default is 1>]
- * [-bytesPerChecksum <value for io.bytes.per.checksum>]
- * </pre>
- *
- * @throws IOException indicates a problem with test startup
+ * Mapper class
*/
- public static void main(String[] args) throws IOException {
- String version = "NameNodeBenchmark.0.3";
- System.out.println(version);
- int bytesPerChecksum = -1;
+ static class NNBenchMapper extends Configured
+ implements Mapper<Text, LongWritable, Text, Text> {
+ FileSystem filesystem = null;
+ private String hostName = null;
+
+ long numberOfFiles = 1l;
+ long blkSize = 1l;
+ short replFactor = 1;
+ int bytesToWrite = 0;
+ String baseDir = null;
+ String dataDirName = null;
+ String op = null;
+ boolean readFile = false;
+ final int MAX_OPERATION_EXCEPTIONS = 1000;
+
+ // Data to collect from the operation
+ int numOfExceptions = 0;
+ long startTimeAL = 0l;
+ long totalTimeAL1 = 0l;
+ long totalTimeAL2 = 0l;
+ long successfulFileOps = 0l;
+
+ /**
+ * Constructor
+ */
+ public NNBenchMapper() {
+ super(config);
+
+ try {
+ filesystem = FileSystem.get(config);
+ } catch(Exception e) {
+ throw new RuntimeException("Cannot get file system.", e);
+ }
+
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch(Exception e) {
+ throw new RuntimeException("Error getting hostname", e);
+ }
+ }
- String usage =
- "Usage: nnbench " +
- " -operation <one of createWrite, openRead, rename, or delete> " +
- " -baseDir <base output/input DFS path> " +
- " -startTime <time to start, given in seconds from the epoch> " +
- " -numFiles <number of files to create> " +
- " -blocksPerFile <number of blocks to create per file> " +
- " [-bytesPerBlock <number of bytes to write to each block, default is 1>] " +
- " [-bytesPerChecksum <value for io.bytes.per.checksum>]" +
- "Note: bytesPerBlock MUST be a multiple of bytesPerChecksum";
-
- String operation = null;
- for (int i = 0; i < args.length; i++) { // parse command line
- if (args[i].equals("-baseDir")) {
- baseDir = new Path(args[++i]);
- } else if (args[i].equals("-numFiles")) {
- numFiles = Integer.parseInt(args[++i]);
- } else if (args[i].equals("-blocksPerFile")) {
- blocksPerFile = Integer.parseInt(args[++i]);
- } else if (args[i].equals("-bytesPerBlock")) {
- bytesPerBlock = Long.parseLong(args[++i]);
- } else if (args[i].equals("-bytesPerChecksum")) {
- bytesPerChecksum = Integer.parseInt(args[++i]);
- } else if (args[i].equals("-startTime")) {
- startTime = Long.parseLong(args[++i]) * 1000;
- } else if (args[i].equals("-operation")) {
- operation = args[++i];
+ /**
+ * Mapper base implementation
+ */
+ public void configure(JobConf conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Mapper base implementation
+ */
+ public void close() throws IOException {
+ }
+
+ /**
+ * Returns when the current number of seconds from the epoch equals
+ * the command line argument given by <code>-startTime</code>.
+ * This allows multiple instances of this program, running on clock
+ * synchronized nodes, to start at roughly the same time.
+ */
+ private boolean barrier() {
+ Configuration conf = filesystem.getConf();
+ long startTime = conf.getLong("test.nnbench.starttime", 0l);
+ long currentTime = System.currentTimeMillis();
+ long sleepTime = startTime - currentTime;
+ boolean retVal = false;
+
+ // If the sleep time is greater than 0, then sleep and return
+ if (sleepTime > 0) {
+ TaskTracker.LOG.info("Waiting in barrier for: " + sleepTime + " ms");
+
+ try {
+ Thread.sleep(sleepTime);
+ retVal = true;
+ } catch (Exception e) {
+ retVal = false;
+ }
+ }
+
+ return retVal;
+ }
+
+ /**
+ * Map method
+ */
+ public void map(Text key,
+ LongWritable value,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ Configuration conf = filesystem.getConf();
+
+ numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l);
+ blkSize = conf.getLong("test.nnbench.blocksize", 1l);
+ replFactor = (short) (conf.getInt("test.nnbench.replicationfactor", 1));
+ bytesToWrite = conf.getInt("test.nnbench.bytestowrite", 0);
+ baseDir = conf.get("test.nnbench.basedir");
+ dataDirName = conf.get("test.nnbench.datadir.name");
+ op = conf.get("test.nnbench.operation");
+ readFile = conf.getBoolean("test.nnbench.readFileAfterOpen", false);
+
+ long totalTimeTPmS = 0l;
+ long startTimeTPmS = 0l;
+ long endTimeTPms = 0l;
+
+ numOfExceptions = 0;
+ startTimeAL = 0l;
+ totalTimeAL1 = 0l;
+ totalTimeAL2 = 0l;
+ successfulFileOps = 0l;
+
+ if (barrier()) {
+ if (op.equals(OP_CREATE_WRITE)) {
+ startTimeTPmS = System.currentTimeMillis();
+ doCreateWriteOp("file_" + hostName + "_", output, reporter);
+ } else if (op.equals(OP_OPEN_READ)) {
+ startTimeTPmS = System.currentTimeMillis();
+ doOpenReadOp("file_" + hostName + "_", output, reporter);
+ } else if (op.equals(OP_RENAME)) {
+ startTimeTPmS = System.currentTimeMillis();
+ doRenameOp("file_" + hostName + "_", output, reporter);
+ } else if (op.equals(OP_DELETE)) {
+ startTimeTPmS = System.currentTimeMillis();
+ doDeleteOp("file_" + hostName + "_", output, reporter);
+ }
+
+ endTimeTPms = System.currentTimeMillis();
+ totalTimeTPmS = endTimeTPms - startTimeTPmS;
} else {
- System.out.println(usage);
- System.exit(-1);
+ output.collect(new Text("l:latemaps"), new Text("1"));
}
+
+ // collect after the map end time is measured
+ output.collect(new Text("l:totalTimeAL1"),
+ new Text(String.valueOf(totalTimeAL1)));
+ output.collect(new Text("l:totalTimeAL2"),
+ new Text(String.valueOf(totalTimeAL2)));
+ output.collect(new Text("l:numOfExceptions"),
+ new Text(String.valueOf(numOfExceptions)));
+ output.collect(new Text("l:successfulFileOps"),
+ new Text(String.valueOf(successfulFileOps)));
+ output.collect(new Text("l:totalTimeTPmS"),
+ new Text(String.valueOf(totalTimeTPmS)));
+ output.collect(new Text("min:mapStartTimeTPmS"),
+ new Text(String.valueOf(startTimeTPmS)));
+ output.collect(new Text("max:mapEndTimeTPmS"),
+ new Text(String.valueOf(endTimeTPms)));
}
- bytesPerFile = bytesPerBlock * blocksPerFile;
- JobConf jobConf = new JobConf(new Configuration(), NNBench.class);
+ /**
+ * Create and Write operation.
+ */
+ private void doCreateWriteOp(String name,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
+ FSDataOutputStream out = null;
+ byte[] buffer = new byte[bytesToWrite];
+
+ for (long l = 0l; l < numberOfFiles; l++) {
+ Path filePath = new Path(new Path(baseDir, dataDirName),
+ name + "_" + l);
+
+ boolean successfulOp = false;
+ while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+ try {
+ // Set up timer for measuring AL (transaction #1)
+ startTimeAL = System.currentTimeMillis();
+ // Create the file
+ // Use a buffer size of 512
+ out = filesystem.create(filePath,
+ true,
+ 512,
+ replFactor,
+ blkSize);
+ out.write(buffer);
+ totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+
+ // Close the file / file output stream
+ // Set up timers for measuring AL (transaction #2)
+ startTimeAL = System.currentTimeMillis();
+ out.close();
+
+ totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
+ successfulOp = true;
+ successfulFileOps ++;
+ } catch (IOException e) {
+ TaskTracker.LOG.info("Exception recorded in op: " +
+ "Create/Write/Close");
+
+ numOfExceptions++;
+ }
+ }
+ }
+ }
- if ( bytesPerChecksum < 0 ) { // if it is not set in cmdline
- bytesPerChecksum = jobConf.getInt("io.bytes.per.checksum", 512);
+ /**
+ * Open operation
+ */
+ private void doOpenReadOp(String name,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
+ FSDataInputStream input = null;
+ byte[] buffer = new byte[bytesToWrite];
+
+ for (long l = 0l; l < numberOfFiles; l++) {
+ Path filePath = new Path(new Path(baseDir, dataDirName),
+ name + "_" + l);
+
+ boolean successfulOp = false;
+ while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+ try {
+ // Set up timer for measuring AL
+ startTimeAL = System.currentTimeMillis();
+ input = filesystem.open(filePath);
+ totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+
+ // If the file needs to be read (specified at command line)
+ if (readFile) {
+ startTimeAL = System.currentTimeMillis();
+ input.readFully(buffer);
+
+ totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
+ }
+ input.close();
+ successfulOp = true;
+ successfulFileOps ++;
+ } catch (IOException e) {
+ TaskTracker.LOG.info("Exception recorded in op: OpenRead " + e);
+ numOfExceptions++;
+ }
+ }
+ }
}
- jobConf.set("io.bytes.per.checksum", Integer.toString(bytesPerChecksum));
- System.out.println("Inputs: ");
- System.out.println(" operation: " + operation);
- System.out.println(" baseDir: " + baseDir);
- System.out.println(" startTime: " + startTime);
- System.out.println(" numFiles: " + numFiles);
- System.out.println(" blocksPerFile: " + blocksPerFile);
- System.out.println(" bytesPerBlock: " + bytesPerBlock);
- System.out.println(" bytesPerChecksum: " + bytesPerChecksum);
-
- if (operation == null || // verify args
- baseDir == null ||
- numFiles < 1 ||
- blocksPerFile < 1 ||
- bytesPerBlock < 0 ||
- bytesPerBlock % bytesPerChecksum != 0)
- {
- System.err.println(usage);
- System.exit(-1);
+ /**
+ * Rename operation
+ */
+ private void doRenameOp(String name,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
+ for (long l = 0l; l < numberOfFiles; l++) {
+ Path filePath = new Path(new Path(baseDir, dataDirName),
+ name + "_" + l);
+ Path filePathR = new Path(new Path(baseDir, dataDirName),
+ name + "_r_" + l);
+
+ boolean successfulOp = false;
+ while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+ try {
+ // Set up timer for measuring AL
+ startTimeAL = System.currentTimeMillis();
+ filesystem.rename(filePath, filePathR);
+ totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+
+ successfulOp = true;
+ successfulFileOps ++;
+ } catch (IOException e) {
+ TaskTracker.LOG.info("Exception recorded in op: Rename");
+
+ numOfExceptions++;
+ }
+ }
}
+ }
- fileSys = FileSystem.get(jobConf);
- uniqueId = java.net.InetAddress.getLocalHost().getHostName();
- taskDir = new Path(baseDir, uniqueId);
- // initialize buffer used for writing/reading file
- buffer = new byte[(int) Math.min(bytesPerFile, 32768L)];
-
- Date execTime;
- Date endTime;
- long duration;
- int exceptions = 0;
- barrier(); // wait for coordinated start time
- execTime = new Date();
- System.out.println("Job started: " + startTime);
- if (operation.equals("createWrite")) {
- if (!fileSys.mkdirs(taskDir)) {
- throw new IOException("Mkdirs failed to create " + taskDir.toString());
- }
- exceptions = createWrite();
- } else if (operation.equals("openRead")) {
- exceptions = openRead();
- } else if (operation.equals("rename")) {
- exceptions = rename();
- } else if (operation.equals("delete")) {
- exceptions = delete();
- } else {
- System.err.println(usage);
- System.exit(-1);
- }
- endTime = new Date();
- System.out.println("Job ended: " + endTime);
- duration = (endTime.getTime() - execTime.getTime()) /1000;
- System.out.println("The " + operation + " job took " + duration + " seconds.");
- System.out.println("The job recorded " + exceptions + " exceptions.");
+ /**
+ * Delete operation
+ */
+ private void doDeleteOp(String name,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
+ for (long l = 0l; l < numberOfFiles; l++) {
+ Path filePath = new Path(new Path(baseDir, dataDirName),
+ name + "_" + l);
+
+ boolean successfulOp = false;
+ while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
+ try {
+ // Set up timer for measuring AL
+ startTimeAL = System.currentTimeMillis();
+ filesystem.delete(filePath);
+ totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
+
+ successfulOp = true;
+ successfulFileOps ++;
+ } catch (IOException e) {
+ TaskTracker.LOG.info("Exception in recorded op: Delete");
+
+ numOfExceptions++;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Reducer class
+ */
+ static class NNBenchReducer extends MapReduceBase
+ implements Reducer<Text, Text, Text, Text> {
+
+ protected String hostName;
+
+ public NNBenchReducer () {
+ TaskTracker.LOG.info("Starting NNBenchReducer !!!");
+ try {
+ hostName = java.net.InetAddress.getLocalHost().getHostName();
+ } catch(Exception e) {
+ hostName = "localhost";
+ }
+ TaskTracker.LOG.info("Starting NNBenchReducer on " + hostName);
+ }
+
+ /**
+ * Reduce method
+ */
+ public void reduce(Text key,
+ Iterator<Text> values,
+ OutputCollector<Text, Text> output,
+ Reporter reporter
+ ) throws IOException {
+ String field = key.toString();
+
+ reporter.setStatus("starting " + field + " ::host = " + hostName);
+
+ // sum long values
+ if (field.startsWith("l:")) {
+ long lSum = 0;
+ while (values.hasNext()) {
+ lSum += Long.parseLong(values.next().toString());
+ }
+ output.collect(key, new Text(String.valueOf(lSum)));
+ }
+
+ if (field.startsWith("min:")) {
+ long minVal = -1;
+ while (values.hasNext()) {
+ long value = Long.parseLong(values.next().toString());
+
+ if (minVal == -1) {
+ minVal = value;
+ } else {
+ if (value != 0 && value < minVal) {
+ minVal = value;
+ }
+ }
+ }
+ output.collect(key, new Text(String.valueOf(minVal)));
+ }
+
+ if (field.startsWith("max:")) {
+ long maxVal = -1;
+ while (values.hasNext()) {
+ long value = Long.parseLong(values.next().toString());
+
+ if (maxVal == -1) {
+ maxVal = value;
+ } else {
+ if (value > maxVal) {
+ maxVal = value;
+ }
+ }
+ }
+ output.collect(key, new Text(String.valueOf(maxVal)));
+ }
+
+ reporter.setStatus("finished " + field + " ::host = " + hostName);
+ }
}
}