You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/07 19:12:20 UTC
svn commit: r483588 - in /lucene/hadoop/trunk: CHANGES.txt
src/examples/org/apache/hadoop/examples/NNBench.java
Author: cutting
Date: Thu Dec 7 10:12:19 2006
New Revision: 483588
URL: http://svn.apache.org/viewvc?view=rev&rev=483588
Log:
HADOOP-763. Change DFS namenode benchmark to not use MapReduce. Contributed by Nigel.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483588&r1=483587&r2=483588
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Dec 7 10:12:19 2006
@@ -8,6 +8,9 @@
2. HADOOP-779. Fix contrib/streaming to work correctly with gzipped
input files. (Hairong Kuang via cutting)
+ 3. HADOOP-763. Change DFS namenode benchmark to not use MapReduce.
+ (Nigel Daley via cutting)
+
Release 0.9.0 - 2006-12-01
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java?view=diff&rev=483588&r1=483587&r2=483588
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Thu Dec 7 10:12:19 2006
@@ -20,206 +20,269 @@
import java.io.IOException;
import java.util.Date;
-import java.util.Iterator;
-import java.util.Random;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.util.Progressable;
/**
- * This program uses map/reduce to run a distributed job where there is
- * no interaction between the tasks. Each task creates a configurable
- * number of files. Each file has a configurable number of bytes
- * written to it, then it is closed, re-opened, and read from, and
- * re-closed. This program functions as a stress-test and benchmark
- * for namenode, especially when the number of bytes written to
- * each file is small.
+ * This program executes a specified operation that applies load to
+ * the NameNode. Possible operations include create/writing files,
+ * opening/reading files, renaming files, and deleting files.
*
- * @author Milind Bhandarkar
+ * When run simultaneously on multiple nodes, this program functions
+ * as a stress-test and benchmark for namenode, especially when
+ * the number of bytes written to each file is small.
+ *
+ * @author Nigel Daley
*/
-public class NNBench extends MapReduceBase implements Reducer {
-
- public static class Map extends MapReduceBase implements Mapper {
- private FileSystem fileSys = null;
- private int numBytesToWrite;
- private Random random = new Random();
- private String taskId = null;
- private Path topDir = null;
-
- private void randomizeBytes(byte[] data, int offset, int length) {
- for(int i=offset + length - 1; i >= offset; --i) {
- data[i] = (byte) random.nextInt(256);
+public class NNBench {
+ // variable initialzed from command line arguments
+ private static long startTime = 0;
+ private static int numFiles = 0;
+ private static long bytesPerBlock = 1;
+ private static long blocksPerFile = 0;
+ private static long bytesPerFile = 1;
+ private static Path baseDir = null;
+
+ // variables initialized in main()
+ private static FileSystem fileSys = null;
+ private static Path taskDir = null;
+ private static String uniqueId = null;
+ private static byte[] buffer;
+
+ /**
+ * Returns when the current number of seconds from the epoch equals
+ * the command line argument given by <code>-startTime</code>.
+ * This allows multiple instances of this program, running on clock
+ * synchronized nodes, to start at roughly the same time.
+ */
+ static void barrier() {
+ long sleepTime;
+ while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ex) {
+ }
}
}
/**
- * Given a number of files to create, create and open those files
- * for both writing and reading a given number of bytes.
+ * Create and write to a given number of files. Repeat each remote
+ * operation until is suceeds (does not throw an exception).
+ *
+ * @return the number of exceptions caught
*/
- public void map(WritableComparable key,
- Writable value,
- OutputCollector output,
- Reporter reporter) throws IOException {
- int nFiles = ((IntWritable) value).get();
- Path taskDir = new Path(topDir, taskId);
- if (!fileSys.mkdirs(taskDir)) {
- throw new IOException("Mkdirs failed to create " + taskDir.toString());
- }
- byte[] buffer = new byte[32768];
- for (int index = 0; index < nFiles; index++) {
- FSDataOutputStream out = fileSys.create(
- new Path(taskDir, Integer.toString(index)));
- int toBeWritten = numBytesToWrite;
+ static int createWrite() {
+ int exceptions = 0;
+ FSOutputStream out = null;
+ boolean success = false;
+ for (int index = 0; index < numFiles; index++) {
+ do { // create file until is succeeds
+ try {
+ out = fileSys.createRaw(
+ new Path(taskDir, "" + index), false, (short)1, bytesPerBlock);
+ success = true;
+ } catch (IOException ioe) { success=false; exceptions++; }
+ } while (!success);
+ long toBeWritten = bytesPerFile;
while (toBeWritten > 0) {
- int nbytes = Math.min(buffer.length, toBeWritten);
- randomizeBytes(buffer, 0, nbytes);
+ int nbytes = (int) Math.min(buffer.length, toBeWritten);
toBeWritten -= nbytes;
- out.write(buffer, 0, nbytes);
- reporter.setStatus("wrote " + (numBytesToWrite-toBeWritten) +
- " bytes for "+ index +"th file.");
+ try { // only try once
+ out.write(buffer, 0, nbytes);
+ } catch (IOException ioe) {
+ exceptions++;
+ }
}
- out.close();
+ do { // close file until is succeeds
+ try {
+ out.close();
+ success = true;
+ } catch (IOException ioe) { success=false; exceptions++; }
+ } while (!success);
}
- for (int index = 0; index < nFiles; index++) {
- FSDataInputStream in = fileSys.open(
- new Path(taskDir, Integer.toString(index)));
- int toBeRead = numBytesToWrite;
- while (toBeRead > 0) {
- int nbytes = Math.min(buffer.length, toBeRead);
- toBeRead -= nbytes;
- in.read(buffer, 0, nbytes);
- reporter.setStatus("read " + (numBytesToWrite-toBeRead) +
- " bytes for "+ index +"th file.");
+ return exceptions;
+ }
+
+ /**
+ * Open and read a given number of files.
+ *
+ * @return the number of exceptions caught
+ */
+ static int openRead() {
+ int exceptions = 0;
+ FSInputStream in = null;
+ for (int index = 0; index < numFiles; index++) {
+ try {
+ in = fileSys.openRaw(new Path(taskDir, "" + index));
+ long toBeRead = bytesPerFile;
+ while (toBeRead > 0) {
+ int nbytes = (int) Math.min(buffer.length, toBeRead);
+ toBeRead -= nbytes;
+ try { // only try once
+ in.read(buffer, 0, nbytes);
+ } catch (IOException ioe) {
+ exceptions++;
+ }
+ }
+ in.close();
+ } catch (IOException ioe) {
+ exceptions++;
}
- in.close();
}
- fileSys.delete(taskDir); // clean up after yourself
- }
+ return exceptions;
+ }
+
+ /**
+ * Rename a given number of files. Repeat each remote
+ * operation until is suceeds (does not throw an exception).
+ *
+ * @return the number of exceptions caught
+ */
+ static int rename() {
+ int exceptions = 0;
+ boolean success = false;
+ for (int index = 0; index < numFiles; index++) {
+ do { // rename file until is succeeds
+ try {
+ boolean result = fileSys.renameRaw(
+ new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
+ success = true;
+ } catch (IOException ioe) { success=false; exceptions++; }
+ } while (!success);
+ }
+ return exceptions;
+ }
/**
- * Save the values out of the configuaration that we need to write
- * the data.
+ * Delete a given number of files. Repeat each remote
+ * operation until is suceeds (does not throw an exception).
+ *
+ * @return the number of exceptions caught
*/
- public void configure(JobConf job) {
- try {
- fileSys = FileSystem.get(job);
- } catch (IOException e) {
- throw new RuntimeException("Can't get default file system", e);
- }
- numBytesToWrite = job.getInt("test.nnbench.bytes_per_file", 0);
- topDir = new Path(job.get("test.nnbench.topdir", "/nnbench"));
- taskId = job.get("mapred.task.id", (new Long(random.nextLong())).toString());
- }
-
- }
-
- public void reduce(WritableComparable key,
- Iterator values,
- OutputCollector output,
- Reporter reporter) throws IOException {
- // nothing
- }
-
+ static int delete() {
+ int exceptions = 0;
+ boolean success = false;
+ for (int index = 0; index < numFiles; index++) {
+ do { // delete file until is succeeds
+ try {
+ boolean result = fileSys.deleteRaw(new Path(taskDir, "A" + index));
+ success = true;
+ } catch (IOException ioe) { success=false; exceptions++; }
+ } while (!success);
+ }
+ return exceptions;
+ }
+
/**
- * This is the main routine for launching a distributed namenode stress test.
- * It runs 10 maps/node. The reduce doesn't do anything.
- *
- * @throws IOException
+ * This launches a given namenode operation (<code>-operation</code>),
+ * starting at a given time (<code>-startTime</code>). The files used
+ * by the openRead, rename, and delete operations are the same files
+ * created by the createWrite operation. Typically, the program
+ * would be run four times, once for each operation in this order:
+ * createWrite, openRead, rename, delete.
+ *
+ * <pre>
+ * Usage: nnbench
+ * -operation <one of createWrite, openRead, rename, or delete>
+ * -baseDir <base output/input DFS path>
+ * -startTime <time to start, given in seconds from the epoch>
+ * -numFiles <number of files to create, read, rename, or delete>
+ * -blocksPerFile <number of blocks to create per file>
+ * [-bytesPerBlock <number of bytes to write to each block, default is 1>]
+ * </pre>
+ *
+ * @throws IOException indicates a problem with test startup
*/
public static void main(String[] args) throws IOException {
- Configuration defaults = new Configuration();
- if (args.length != 3) {
- System.out.println("Usage: nnbench <out-dir> <filesPerMap> <bytesPerFile>");
- return;
- }
- Path outDir = new Path(args[0]);
- int filesPerMap = Integer.parseInt(args[1]);
- int numBytesPerFile = Integer.parseInt(args[2]);
-
- JobConf jobConf = new JobConf(defaults, NNBench.class);
- jobConf.setJobName("nnbench");
- jobConf.setInt("test.nnbench.bytes_per_file", numBytesPerFile);
- jobConf.set("test.nnbench.topdir", args[0]);
-
- // turn off speculative execution, because DFS doesn't handle
- // multiple writers to the same file.
- jobConf.setSpeculativeExecution(false);
- jobConf.setInputFormat(SequenceFileInputFormat.class);
- jobConf.setOutputKeyClass(BytesWritable.class);
- jobConf.setOutputValueClass(BytesWritable.class);
-
- jobConf.setMapperClass(Map.class);
- jobConf.setReducerClass(NNBench.class);
-
- JobClient client = new JobClient(jobConf);
- ClusterStatus cluster = client.getClusterStatus();
- int numMaps = cluster.getTaskTrackers() *
- jobConf.getInt("test.nnbench.maps_per_host", 10);
- jobConf.setNumMapTasks(numMaps);
- System.out.println("Running " + numMaps + " maps.");
- jobConf.setNumReduceTasks(1);
-
- Path tmpDir = new Path("random-work");
- Path inDir = new Path(tmpDir, "in");
- Path fakeOutDir = new Path(tmpDir, "out");
- FileSystem fileSys = FileSystem.get(jobConf);
- if (fileSys.exists(outDir)) {
- System.out.println("Error: Output directory " + outDir +
- " already exists.");
- return;
- }
- fileSys.delete(tmpDir);
- if (!fileSys.mkdirs(inDir)) {
- System.out.println("Error: Mkdirs failed to create " +
- inDir.toString());
- return;
- }
-
- for(int i=0; i < numMaps; ++i) {
- Path file = new Path(inDir, "part"+i);
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys,
- jobConf, file,
- IntWritable.class, IntWritable.class,
- CompressionType.NONE,
- (Progressable)null);
- writer.append(new IntWritable(0), new IntWritable(filesPerMap));
- writer.close();
+ String version = "NameNodeBenchmark.0.3";
+ System.out.println(version);
+
+ String usage =
+ "Usage: nnbench " +
+ "-operation <one of createWrite, openRead, rename, or delete> " +
+ "-baseDir <base output/input DFS path> " +
+ "-startTime <time to start, given in seconds from the epoch> " +
+ "-numFiles <number of files to create> " +
+ "-blocksPerFile <number of blocks to create per file> " +
+ "[-bytesPerBlock <number of bytes to write to each block, default is 1>]";
+
+ String operation = null;
+ for (int i = 0; i < args.length; i++) { // parse command line
+ if (args[i].equals("-baseDir")) {
+ baseDir = new Path(args[++i]);
+ } else if (args[i].equals("-numFiles")) {
+ numFiles = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-blocksPerFile")) {
+ blocksPerFile = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-bytesPerBlock")) {
+ bytesPerBlock = Long.parseLong(args[++i]);
+ } else if (args[i].equals("-startTime")) {
+ startTime = Long.parseLong(args[++i]) * 1000;
+ } else if (args[i].equals("-operation")) {
+ operation = args[++i];
+ } else {
+ System.out.println(usage);
+ System.exit(-1);
+ }
}
- jobConf.setInputPath(inDir);
- jobConf.setOutputPath(fakeOutDir);
+ bytesPerFile = bytesPerBlock * blocksPerFile;
- // Uncomment to run locally in a single process
- //job_conf.set("mapred.job.tracker", "local");
+ System.out.println("Inputs: ");
+ System.out.println(" operation: " + operation);
+ System.out.println(" baseDir: " + baseDir);
+ System.out.println(" startTime: " + startTime);
+ System.out.println(" numFiles: " + numFiles);
+ System.out.println(" blocksPerFile: " + blocksPerFile);
+ System.out.println(" bytesPerBlock: " + bytesPerBlock);
+
+ if (operation == null || // verify args
+ baseDir == null ||
+ numFiles < 1 ||
+ blocksPerFile < 1 ||
+ bytesPerBlock < 0)
+ {
+ System.err.println(usage);
+ System.exit(-1);
+ }
- Date startTime = new Date();
+ JobConf jobConf = new JobConf(new Configuration(), NNBench.class);
+ fileSys = FileSystem.get(jobConf);
+ uniqueId = java.net.InetAddress.getLocalHost().getHostName();
+ taskDir = new Path(baseDir, uniqueId);
+ // initialize buffer used for writing/reading file
+ buffer = new byte[(int) Math.min(bytesPerFile, 32768L)];
+
+ Date execTime;
+ Date endTime;
+ long duration;
+ int exceptions = 0;
+ barrier(); // wait for coordinated start time
+ execTime = new Date();
System.out.println("Job started: " + startTime);
- try {
- JobClient.runJob(jobConf);
- Date endTime = new Date();
- System.out.println("Job ended: " + endTime);
- System.out.println("The job took " +
- (endTime.getTime() - startTime.getTime()) /1000 + " seconds.");
- } finally {
- fileSys.delete(tmpDir);
+ if (operation.equals("createWrite")) {
+ if (!fileSys.mkdirs(taskDir)) {
+ throw new IOException("Mkdirs failed to create " + taskDir.toString());
+ }
+ exceptions = createWrite();
+ } else if (operation.equals("openRead")) {
+ exceptions = openRead();
+ } else if (operation.equals("rename")) {
+ exceptions = rename();
+ } else if (operation.equals("delete")) {
+ exceptions = delete();
+ } else {
+ System.err.println(usage);
+ System.exit(-1);
}
+ endTime = new Date();
+ System.out.println("Job ended: " + endTime);
+ duration = (endTime.getTime() - execTime.getTime()) /1000;
+ System.out.println("The " + operation + " job took " + duration + " seconds.");
+ System.out.println("The job recorded " + exceptions + " exceptions.");
}
}