You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:35:06 UTC
svn commit: r1077042 - in
/hadoop/common/branches/branch-0.20-security-patches: ivy/
src/test/org/apache/hadoop/fs/ src/test/org/apache/hadoop/fs/loadGenerator/
src/test/org/apache/hadoop/hdfs/
Author: omalley
Date: Fri Mar 4 03:35:06 2011
New Revision: 1077042
URL: http://svn.apache.org/viewvc?rev=1077042&view=rev
Log:
commit 7a2830662462e5547c8c0a576812a5e3b419f961
Author: Erik Steffl <st...@yahoo-inc.com>
Date: Wed Oct 28 12:43:46 2009 -0700
HDFS:587 from http://issues.apache.org/jira/secure/attachment/12422760/jira.HDFS-587.branch-0.20-internal.1.patch
+++ b/YAHOO-CHANGES.txt
+ HDFS-587. Add ability to run HDFS with MR test on non-default queue,
+ also updated junit dependendcy from junit-3.8.1 to junit-4.5 (to make
+ it possible to use Configured and Tool to process command line to
+ be able to specify a queue). Contributed by Erik Steffl.
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java
Modified: hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties (original)
+++ hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties Fri Mar 4 03:35:06 2011
@@ -48,7 +48,7 @@ jsp-api.version=5.5.12
jets3t.version=0.6.1
jetty.version=6.1.14
jetty-util.version=6.1.14
-junit.version=3.8.1
+junit.version=4.5
jdiff.version=1.0.9
json.version=1.0
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Fri Mar 4 03:35:06 2011
@@ -22,12 +22,8 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
/**
* Reducer that accumulates values based on their type.
@@ -45,8 +41,12 @@ import org.apache.hadoop.mapred.Reporter
* </ul>
*
*/
+@SuppressWarnings("deprecation")
public class AccumulatingReducer extends MapReduceBase
- implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+ implements Reducer<Text, Text, Text, Text> {
+ static final String VALUE_TYPE_LONG = "l:";
+ static final String VALUE_TYPE_FLOAT = "f:";
+ static final String VALUE_TYPE_STRING = "s:";
private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class);
protected String hostName;
@@ -61,9 +61,9 @@ public class AccumulatingReducer extends
LOG.info("Starting AccumulatingReducer on " + hostName);
}
- public void reduce(UTF8 key,
- Iterator<UTF8> values,
- OutputCollector<UTF8, UTF8> output,
+ public void reduce(Text key,
+ Iterator<Text> values,
+ OutputCollector<Text, Text> output,
Reporter reporter
) throws IOException {
String field = key.toString();
@@ -71,30 +71,30 @@ public class AccumulatingReducer extends
reporter.setStatus("starting " + field + " ::host = " + hostName);
// concatenate strings
- if (field.startsWith("s:")) {
- String sSum = "";
+ if (field.startsWith(VALUE_TYPE_STRING)) {
+ StringBuffer sSum = new StringBuffer();
while (values.hasNext())
- sSum += values.next().toString() + ";";
- output.collect(key, new UTF8(sSum));
+ sSum.append(values.next().toString()).append(";");
+ output.collect(key, new Text(sSum.toString()));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith("f:")) {
+ if (field.startsWith(VALUE_TYPE_FLOAT)) {
float fSum = 0;
while (values.hasNext())
fSum += Float.parseFloat(values.next().toString());
- output.collect(key, new UTF8(String.valueOf(fSum)));
+ output.collect(key, new Text(String.valueOf(fSum)));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith("l:")) {
+ if (field.startsWith(VALUE_TYPE_LONG)) {
long lSum = 0;
while (values.hasNext()) {
lSum += Long.parseLong(values.next().toString());
}
- output.collect(key, new UTF8(String.valueOf(lSum)));
+ output.collect(key, new Text(String.valueOf(lSum)));
}
reporter.setStatus("finished " + field + " ::host = " + hostName);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java Fri Mar 4 03:35:06 2011
@@ -18,18 +18,28 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
/**
* Distributed i/o benchmark.
@@ -58,8 +68,9 @@ import org.apache.hadoop.conf.*;
* <li>standard i/o rate deviation</li>
* </ul>
*/
-public class DFSCIOTest extends TestCase {
+public class DFSCIOTest extends Configured implements Tool {
// Constants
+ private static final Log LOG = LogFactory.getLog(DFSCIOTest.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
@@ -67,7 +78,6 @@ public class DFSCIOTest extends TestCase
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log";
- private static final Log LOG = FileInputFormat.LOG;
private static Configuration fsConfig = new Configuration();
private static final long MEGA = 0x100000;
private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest");
@@ -88,6 +98,7 @@ public class DFSCIOTest extends TestCase
*
* @throws Exception
*/
+ @Test
public void testIOs() throws Exception {
testIOs(10, 10);
}
@@ -124,9 +135,9 @@ public class DFSCIOTest extends TestCase
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- UTF8.class, LongWritable.class,
+ Text.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new UTF8(name), new LongWritable(fileSize));
+ writer.append(new Text(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -154,26 +165,30 @@ public class DFSCIOTest extends TestCase
* <li>i/o rate squared</li>
* </ul>
*/
- private abstract static class IOStatMapper extends IOMapperBase {
+ private abstract static class IOStatMapper extends IOMapperBase<Long> {
IOStatMapper() {
- super(fsConfig);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
- Object objSize) throws IOException {
- long totalSize = ((Long)objSize).longValue();
+ Long objSize) throws IOException {
+ long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
- output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+ new Text(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+ new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
@@ -188,7 +203,7 @@ public class DFSCIOTest extends TestCase
buffer[i] = (byte)('0' + i % 50);
}
- public Object doIO(Reporter reporter,
+ public Long doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -274,8 +289,8 @@ public class DFSCIOTest extends TestCase
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@@ -289,7 +304,7 @@ public class DFSCIOTest extends TestCase
super();
}
- public Object doIO(Reporter reporter,
+ public Long doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -384,7 +399,79 @@ public class DFSCIOTest extends TestCase
MEGA*fileSize);
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new TestDFSIO(), args);
+ System.exit(res);
+ }
+
+ private static void analyzeResult( FileSystem fs,
+ int testType,
+ long execTime,
+ String resFileName
+ ) throws IOException {
+ Path reduceFile;
+ if (testType == TEST_TYPE_WRITE)
+ reduceFile = new Path(WRITE_DIR, "part-00000");
+ else
+ reduceFile = new Path(READ_DIR, "part-00000");
+ DataInputStream in;
+ in = new DataInputStream(fs.open(reduceFile));
+
+ BufferedReader lines;
+ lines = new BufferedReader(new InputStreamReader(in));
+ long tasks = 0;
+ long size = 0;
+ long time = 0;
+ float rate = 0;
+ float sqrate = 0;
+ String line;
+ while((line = lines.readLine()) != null) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+ String attr = tokens.nextToken();
+ if (attr.endsWith(":tasks"))
+ tasks = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":size"))
+ size = Long.parseLong(tokens. nextToken());
+ else if (attr.endsWith(":time"))
+ time = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":rate"))
+ rate = Float.parseFloat(tokens.nextToken());
+ else if (attr.endsWith(":sqrate"))
+ sqrate = Float.parseFloat(tokens.nextToken());
+ }
+
+ double med = rate / 1000 / tasks;
+ double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
+ String resultLines[] = {
+ "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
+ (testType == TEST_TYPE_READ) ? "read" :
+ "unknown"),
+ " Date & time: " + new Date(System.currentTimeMillis()),
+ " Number of files: " + tasks,
+ "Total MBytes processed: " + size/MEGA,
+ " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
+ "Average IO rate mb/sec: " + med,
+ " Std IO rate deviation: " + stdDev,
+ " Test exec time sec: " + (float)execTime / 1000,
+ "" };
+
+ PrintStream res = new PrintStream(
+ new FileOutputStream(
+ new File(resFileName), true));
+ for(int i = 0; i < resultLines.length; i++) {
+ LOG.info(resultLines[i]);
+ res.println(resultLines[i]);
+ }
+ }
+
+ private static void cleanup(FileSystem fs) throws Exception {
+ LOG.info("Cleaning up test files");
+ fs.delete(new Path(TEST_ROOT_DIR), true);
+ fs.delete(HDFS_TEST_DIR, true);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
int testType = TEST_TYPE_READ;
int bufferSize = DEFAULT_BUFFER_SIZE;
int fileSize = 1;
@@ -449,11 +536,11 @@ public class DFSCIOTest extends TestCase
long execTime = System.currentTimeMillis() - tStart;
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
LOG.info(resultLine);
- return;
+ return 0;
}
if (testType == TEST_TYPE_CLEANUP) {
cleanup(fs);
- return;
+ return 0;
}
createControlFile(fs, fileSize, nrFiles);
long tStart = System.currentTimeMillis();
@@ -466,73 +553,8 @@ public class DFSCIOTest extends TestCase
analyzeResult(fs, testType, execTime, resFileName);
} catch(Exception e) {
System.err.print(e.getLocalizedMessage());
- System.exit(-1);
- }
- }
-
- private static void analyzeResult( FileSystem fs,
- int testType,
- long execTime,
- String resFileName
- ) throws IOException {
- Path reduceFile;
- if (testType == TEST_TYPE_WRITE)
- reduceFile = new Path(WRITE_DIR, "part-00000");
- else
- reduceFile = new Path(READ_DIR, "part-00000");
- DataInputStream in;
- in = new DataInputStream(fs.open(reduceFile));
-
- BufferedReader lines;
- lines = new BufferedReader(new InputStreamReader(in));
- long tasks = 0;
- long size = 0;
- long time = 0;
- float rate = 0;
- float sqrate = 0;
- String line;
- while((line = lines.readLine()) != null) {
- StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
- String attr = tokens.nextToken();
- if (attr.endsWith(":tasks"))
- tasks = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":size"))
- size = Long.parseLong(tokens. nextToken());
- else if (attr.endsWith(":time"))
- time = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":rate"))
- rate = Float.parseFloat(tokens.nextToken());
- else if (attr.endsWith(":sqrate"))
- sqrate = Float.parseFloat(tokens.nextToken());
- }
-
- double med = rate / 1000 / tasks;
- double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
- String resultLines[] = {
- "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
- (testType == TEST_TYPE_READ) ? "read" :
- "unknown"),
- " Date & time: " + new Date(System.currentTimeMillis()),
- " Number of files: " + tasks,
- "Total MBytes processed: " + size/MEGA,
- " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
- "Average IO rate mb/sec: " + med,
- " Std IO rate deviation: " + stdDev,
- " Test exec time sec: " + (float)execTime / 1000,
- "" };
-
- PrintStream res = new PrintStream(
- new FileOutputStream(
- new File(resFileName), true));
- for(int i = 0; i < resultLines.length; i++) {
- LOG.info(resultLines[i]);
- res.println(resultLines[i]);
+ return -1;
}
- }
-
- private static void cleanup(FileSystem fs) throws Exception {
- LOG.info("Cleaning up test files");
- fs.delete(new Path(TEST_ROOT_DIR), true);
- fs.delete(HDFS_TEST_DIR, true);
+ return 0;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Fri Mar 4 03:35:06 2011
@@ -18,20 +18,31 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.Vector;
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
/**
* Distributed checkup of the file system consistency.
@@ -43,8 +54,9 @@ import org.apache.hadoop.conf.*;
* Optionally displays statistics on read performance.
*
*/
-public class DistributedFSCheck extends TestCase {
+public class DistributedFSCheck extends Configured implements Tool {
// Constants
+ private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_CLEANUP = 2;
private static final int DEFAULT_BUFFER_SIZE = 1000000;
@@ -52,7 +64,6 @@ public class DistributedFSCheck extends
private static final long MEGA = 0x100000;
private static Configuration fsConfig = new Configuration();
- private static final Log LOG = FileInputFormat.LOG;
private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
@@ -70,6 +81,7 @@ public class DistributedFSCheck extends
*
* @throws Exception
*/
+ @Test
public void testFSBlocks() throws Exception {
testFSBlocks("/");
}
@@ -92,7 +104,7 @@ public class DistributedFSCheck extends
Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, fsConfig, inputFile,
- UTF8.class, LongWritable.class, CompressionType.NONE);
+ Text.class, LongWritable.class, CompressionType.NONE);
try {
nrFiles = 0;
@@ -106,30 +118,41 @@ public class DistributedFSCheck extends
private void listSubtree(Path rootFile,
SequenceFile.Writer writer
) throws IOException {
- if (!fs.isDirectory(rootFile)) {
+ FileStatus rootStatus = fs.getFileStatus(rootFile);
+ listSubtree(rootStatus, writer);
+ }
+
+ private void listSubtree(FileStatus rootStatus,
+ SequenceFile.Writer writer
+ ) throws IOException {
+ Path rootFile = rootStatus.getPath();
+ if (!rootStatus.isDir()) {
nrFiles++;
// For a regular file generate <fName,offset> pairs
long blockSize = fs.getDefaultBlockSize();
- long fileLength = fs.getLength(rootFile);
+ long fileLength = rootStatus.getLen();
for(long offset = 0; offset < fileLength; offset += blockSize)
- writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
+ writer.append(new Text(rootFile.toString()), new LongWritable(offset));
return;
}
- FileStatus children[] = fs.listStatus(rootFile);
- if (children == null)
+ FileStatus [] children = null;
+ try {
+ children = fs.listStatus(rootFile);
+ } catch (FileNotFoundException fnfe ){
throw new IOException("Could not get listing for " + rootFile);
+ }
+
for (int i = 0; i < children.length; i++)
- listSubtree(children[i].getPath(), writer);
+ listSubtree(children[i], writer);
}
/**
* DistributedFSCheck mapper class.
*/
- public static class DistributedFSCheckMapper extends IOMapperBase {
+ public static class DistributedFSCheckMapper extends IOMapperBase<Object> {
public DistributedFSCheckMapper() {
- super(fsConfig);
}
public Object doIO(Reporter reporter,
@@ -163,14 +186,17 @@ public class DistributedFSCheck extends
return new Long(actualSize);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Object corruptedBlock) throws IOException {
- output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"),
+ new Text(String.valueOf(1)));
if (corruptedBlock.getClass().getName().endsWith("String")) {
- output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
+ output.collect(
+ new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"),
+ new Text((String)corruptedBlock));
return;
}
long totalSize = ((Long)corruptedBlock).longValue();
@@ -179,9 +205,12 @@ public class DistributedFSCheck extends
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
}
}
@@ -195,59 +224,17 @@ public class DistributedFSCheck extends
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
- public static void main(String[] args) throws Exception {
- int testType = TEST_TYPE_READ;
- int bufferSize = DEFAULT_BUFFER_SIZE;
- String resFileName = DEFAULT_RES_FILE_NAME;
- String rootName = "/";
- boolean viewStats = false;
-
- String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
-
- if (args.length == 1 && args[0].startsWith("-h")) {
- System.err.println(usage);
- System.exit(-1);
- }
- for(int i = 0; i < args.length; i++) { // parse command line
- if (args[i].equals("-root")) {
- rootName = args[++i];
- } else if (args[i].startsWith("-clean")) {
- testType = TEST_TYPE_CLEANUP;
- } else if (args[i].equals("-bufferSize")) {
- bufferSize = Integer.parseInt(args[++i]);
- } else if (args[i].equals("-resFile")) {
- resFileName = args[++i];
- } else if (args[i].startsWith("-stat")) {
- viewStats = true;
- }
- }
-
- LOG.info("root = " + rootName);
- LOG.info("bufferSize = " + bufferSize);
-
- Configuration conf = new Configuration();
- conf.setInt("test.io.file.buffer.size", bufferSize);
- DistributedFSCheck test = new DistributedFSCheck(conf);
-
- if (testType == TEST_TYPE_CLEANUP) {
- test.cleanup();
- return;
- }
- test.createInputFile(rootName);
- long tStart = System.currentTimeMillis();
- test.runDistributedFSCheck();
- long execTime = System.currentTimeMillis() - tStart;
-
- test.analyzeResult(execTime, resFileName, viewStats);
- // test.cleanup(); // clean up after all to restore the system state
+ public static void main(String[] args) throws Exception{
+ int res = ToolRunner.run(new TestDFSIO(), args);
+ System.exit(res);
}
-
+
private void analyzeResult(long execTime,
String resFileName,
boolean viewStats
@@ -328,4 +315,53 @@ public class DistributedFSCheck extends
LOG.info("Cleaning up test files");
fs.delete(TEST_ROOT_DIR, true);
}
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int testType = TEST_TYPE_READ;
+ int bufferSize = DEFAULT_BUFFER_SIZE;
+ String resFileName = DEFAULT_RES_FILE_NAME;
+ String rootName = "/";
+ boolean viewStats = false;
+
+ String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
+
+ if (args.length == 1 && args[0].startsWith("-h")) {
+ System.err.println(usage);
+ return -1;
+ }
+ for(int i = 0; i < args.length; i++) { // parse command line
+ if (args[i].equals("-root")) {
+ rootName = args[++i];
+ } else if (args[i].startsWith("-clean")) {
+ testType = TEST_TYPE_CLEANUP;
+ } else if (args[i].equals("-bufferSize")) {
+ bufferSize = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-resFile")) {
+ resFileName = args[++i];
+ } else if (args[i].startsWith("-stat")) {
+ viewStats = true;
+ }
+ }
+
+ LOG.info("root = " + rootName);
+ LOG.info("bufferSize = " + bufferSize);
+
+ Configuration conf = new Configuration();
+ conf.setInt("test.io.file.buffer.size", bufferSize);
+ DistributedFSCheck test = new DistributedFSCheck(conf);
+
+ if (testType == TEST_TYPE_CLEANUP) {
+ test.cleanup();
+ return 0;
+ }
+ test.createInputFile(rootName);
+ long tStart = System.currentTimeMillis();
+ test.runDistributedFSCheck();
+ long execTime = System.currentTimeMillis() - tStart;
+
+ test.analyzeResult(execTime, resFileName, viewStats);
+ // test.cleanup(); // clean up after all to restore the system state
+ return 0;
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java Fri Mar 4 03:35:06 2011
@@ -19,16 +19,10 @@ package org.apache.hadoop.fs;
import java.io.IOException;
import java.net.InetAddress;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
/**
* Base mapper class for IO operations.
@@ -39,16 +33,20 @@ import org.apache.hadoop.mapred.Reporter
* statistics data to be collected by subsequent reducers.
*
*/
-public abstract class IOMapperBase extends Configured
- implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
+@SuppressWarnings("deprecation")
+public abstract class IOMapperBase<T> extends Configured
+ implements Mapper<Text, LongWritable, Text, Text> {
protected byte[] buffer;
protected int bufferSize;
protected FileSystem fs;
protected String hostName;
- public IOMapperBase(Configuration conf) {
- super(conf);
+ public IOMapperBase() {
+ }
+
+ public void configure(JobConf conf) {
+ setConf(conf);
try {
fs = FileSystem.get(conf);
} catch (Exception e) {
@@ -63,10 +61,6 @@ public abstract class IOMapperBase exten
}
}
- public void configure(JobConf job) {
- setConf(job);
- }
-
public void close() throws IOException {
}
@@ -80,7 +74,7 @@ public abstract class IOMapperBase exten
* {@link #collectStats(OutputCollector,String,long,Object)}
* @throws IOException
*/
- abstract Object doIO(Reporter reporter,
+ abstract T doIO(Reporter reporter,
String name,
long value) throws IOException;
@@ -93,10 +87,10 @@ public abstract class IOMapperBase exten
* @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
* @throws IOException
*/
- abstract void collectStats(OutputCollector<UTF8, UTF8> output,
+ abstract void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
- Object doIOReturnValue) throws IOException;
+ T doIOReturnValue) throws IOException;
/**
* Map file name and offset into statistical data.
@@ -111,9 +105,9 @@ public abstract class IOMapperBase exten
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
- public void map(UTF8 key,
+ public void map(Text key,
LongWritable value,
- OutputCollector<UTF8, UTF8> output,
+ OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
@@ -121,7 +115,7 @@ public abstract class IOMapperBase exten
reporter.setStatus("starting " + name + " ::host = " + hostName);
long tStart = System.currentTimeMillis();
- Object statValue = doIO(reporter, name, longValue);
+ T statValue = doIO(reporter, name, longValue);
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java Fri Mar 4 03:35:06 2011
@@ -18,19 +18,30 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.*;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
/**
* Distributed i/o benchmark.
@@ -59,8 +70,9 @@ import org.apache.hadoop.conf.*;
* <li>standard deviation of i/o rate </li>
* </ul>
*/
-public class TestDFSIO extends TestCase {
+public class TestDFSIO extends Configured implements Tool {
// Constants
+ private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
@@ -68,8 +80,6 @@ public class TestDFSIO extends TestCase
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
- private static final Log LOG = FileInputFormat.LOG;
- private static Configuration fsConfig = new Configuration();
private static final long MEGA = 0x100000;
private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
@@ -77,13 +87,19 @@ public class TestDFSIO extends TestCase
private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
+ static{
+ Configuration.addDefaultResource("hdfs-default.xml");
+ Configuration.addDefaultResource("hdfs-site.xml");
+ }
+
/**
* Run the test with default parameters.
*
* @throws Exception
*/
+ @Test
public void testIOs() throws Exception {
- testIOs(10, 10);
+ testIOs(10, 10, new Configuration());
}
/**
@@ -93,21 +109,21 @@ public class TestDFSIO extends TestCase
* @param nrFiles number of files
* @throws IOException
*/
- public static void testIOs(int fileSize, int nrFiles)
+ public static void testIOs(int fileSize, int nrFiles, Configuration fsConfig)
throws IOException {
FileSystem fs = FileSystem.get(fsConfig);
- createControlFile(fs, fileSize, nrFiles);
- writeTest(fs);
- readTest(fs);
+ createControlFile(fs, fileSize, nrFiles, fsConfig);
+ writeTest(fs, fsConfig);
+ readTest(fs, fsConfig);
cleanup(fs);
}
- private static void createControlFile(
- FileSystem fs,
+ private static void createControlFile(FileSystem fs,
int fileSize, // in MB
- int nrFiles
+ int nrFiles,
+ Configuration fsConfig
) throws IOException {
LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
@@ -119,9 +135,9 @@ public class TestDFSIO extends TestCase
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- UTF8.class, LongWritable.class,
+ Text.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new UTF8(name), new LongWritable(fileSize));
+ writer.append(new Text(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -149,41 +165,44 @@ public class TestDFSIO extends TestCase
* <li>i/o rate squared</li>
* </ul>
*/
- private abstract static class IOStatMapper extends IOMapperBase {
+ private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
IOStatMapper() {
- super(fsConfig);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
- Object objSize) throws IOException {
- long totalSize = ((Long)objSize).longValue();
+ Long objSize) throws IOException {
+ long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
- output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+ new Text(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+ new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
/**
* Write mapper class.
*/
- public static class WriteMapper extends IOStatMapper {
+ public static class WriteMapper extends IOStatMapper<Long> {
public WriteMapper() {
- super();
for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50);
}
- public Object doIO(Reporter reporter,
+ public Long doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -205,22 +224,24 @@ public class TestDFSIO extends TestCase
} finally {
out.close();
}
- return new Long(totalSize);
+ return Long.valueOf(totalSize);
}
}
- private static void writeTest(FileSystem fs)
- throws IOException {
+ private static void writeTest(FileSystem fs, Configuration fsConfig)
+ throws IOException {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
- runIOTest(WriteMapper.class, WRITE_DIR);
+ runIOTest(WriteMapper.class, WRITE_DIR, fsConfig);
}
- private static void runIOTest( Class<? extends Mapper> mapperClass,
- Path outputDir
- ) throws IOException {
+ @SuppressWarnings("deprecation")
+ private static void runIOTest(
+ Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
+ Path outputDir,
+ Configuration fsConfig) throws IOException {
JobConf job = new JobConf(fsConfig, TestDFSIO.class);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
@@ -230,8 +251,8 @@ public class TestDFSIO extends TestCase
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@@ -239,13 +260,12 @@ public class TestDFSIO extends TestCase
/**
* Read mapper class.
*/
- public static class ReadMapper extends IOStatMapper {
+ public static class ReadMapper extends IOStatMapper<Long> {
public ReadMapper() {
- super();
}
- public Object doIO(Reporter reporter,
+ public Long doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -264,22 +284,22 @@ public class TestDFSIO extends TestCase
} finally {
in.close();
}
- return new Long(totalSize);
+ return Long.valueOf(totalSize);
}
}
- private static void readTest(FileSystem fs) throws IOException {
+ private static void readTest(FileSystem fs, Configuration fsConfig)
+ throws IOException {
fs.delete(READ_DIR, true);
- runIOTest(ReadMapper.class, READ_DIR);
+ runIOTest(ReadMapper.class, READ_DIR, fsConfig);
}
- private static void sequentialTest(
- FileSystem fs,
+ private static void sequentialTest(FileSystem fs,
int testType,
int fileSize,
int nrFiles
) throws Exception {
- IOStatMapper ioer = null;
+ IOStatMapper<Long> ioer = null;
if (testType == TEST_TYPE_READ)
ioer = new ReadMapper();
else if (testType == TEST_TYPE_WRITE)
@@ -292,21 +312,102 @@ public class TestDFSIO extends TestCase
MEGA*fileSize);
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception{
+ int res = ToolRunner.run(new TestDFSIO(), args);
+ System.exit(res);
+ }
+
+ private static void analyzeResult( FileSystem fs,
+ int testType,
+ long execTime,
+ String resFileName
+ ) throws IOException {
+ Path reduceFile;
+ if (testType == TEST_TYPE_WRITE)
+ reduceFile = new Path(WRITE_DIR, "part-00000");
+ else
+ reduceFile = new Path(READ_DIR, "part-00000");
+ long tasks = 0;
+ long size = 0;
+ long time = 0;
+ float rate = 0;
+ float sqrate = 0;
+ DataInputStream in = null;
+ BufferedReader lines = null;
+ try {
+ in = new DataInputStream(fs.open(reduceFile));
+ lines = new BufferedReader(new InputStreamReader(in));
+ String line;
+ while((line = lines.readLine()) != null) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+ String attr = tokens.nextToken();
+ if (attr.endsWith(":tasks"))
+ tasks = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":size"))
+ size = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":time"))
+ time = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":rate"))
+ rate = Float.parseFloat(tokens.nextToken());
+ else if (attr.endsWith(":sqrate"))
+ sqrate = Float.parseFloat(tokens.nextToken());
+ }
+ } finally {
+ if(in != null) in.close();
+ if(lines != null) lines.close();
+ }
+
+ double med = rate / 1000 / tasks;
+ double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
+ String resultLines[] = {
+ "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
+ (testType == TEST_TYPE_READ) ? "read" :
+ "unknown"),
+ " Date & time: " + new Date(System.currentTimeMillis()),
+ " Number of files: " + tasks,
+ "Total MBytes processed: " + size/MEGA,
+ " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
+ "Average IO rate mb/sec: " + med,
+ " IO rate std deviation: " + stdDev,
+ " Test exec time sec: " + (float)execTime / 1000,
+ "" };
+
+ PrintStream res = null;
+ try {
+ res = new PrintStream(new FileOutputStream(new File(resFileName), true));
+ for(int i = 0; i < resultLines.length; i++) {
+ LOG.info(resultLines[i]);
+ res.println(resultLines[i]);
+ }
+ } finally {
+ if(res != null) res.close();
+ }
+ }
+
+ private static void cleanup(FileSystem fs) throws IOException {
+ LOG.info("Cleaning up test files");
+ fs.delete(new Path(TEST_ROOT_DIR), true);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
int testType = TEST_TYPE_READ;
int bufferSize = DEFAULT_BUFFER_SIZE;
int fileSize = 1;
int nrFiles = 1;
String resFileName = DEFAULT_RES_FILE_NAME;
boolean isSequential = false;
-
- String version="TestFDSIO.0.0.4";
- String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
+
+ String className = TestDFSIO.class.getSimpleName();
+ String version = className + ".0.0.4";
+ String usage = "Usage: " + className + " -read | -write | -clean " +
+ "[-nrFiles N] [-fileSize MB] [-resFile resultFileName] " +
+ "[-bufferSize Bytes] ";
System.out.println(version);
if (args.length == 0) {
System.err.println(usage);
- System.exit(-1);
+ return -1;
}
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].startsWith("-read")) {
@@ -333,6 +434,7 @@ public class TestDFSIO extends TestCase
LOG.info("bufferSize = " + bufferSize);
try {
+ Configuration fsConfig = new Configuration(getConf());
fsConfig.setInt("test.io.file.buffer.size", bufferSize);
FileSystem fs = FileSystem.get(fsConfig);
@@ -342,89 +444,25 @@ public class TestDFSIO extends TestCase
long execTime = System.currentTimeMillis() - tStart;
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
LOG.info(resultLine);
- return;
+ return 0;
}
if (testType == TEST_TYPE_CLEANUP) {
cleanup(fs);
- return;
+ return 0;
}
- createControlFile(fs, fileSize, nrFiles);
+ createControlFile(fs, fileSize, nrFiles, fsConfig);
long tStart = System.currentTimeMillis();
if (testType == TEST_TYPE_WRITE)
- writeTest(fs);
+ writeTest(fs, fsConfig);
if (testType == TEST_TYPE_READ)
- readTest(fs);
+ readTest(fs, fsConfig);
long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, testType, execTime, resFileName);
} catch(Exception e) {
System.err.print(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
-
- private static void analyzeResult( FileSystem fs,
- int testType,
- long execTime,
- String resFileName
- ) throws IOException {
- Path reduceFile;
- if (testType == TEST_TYPE_WRITE)
- reduceFile = new Path(WRITE_DIR, "part-00000");
- else
- reduceFile = new Path(READ_DIR, "part-00000");
- DataInputStream in;
- in = new DataInputStream(fs.open(reduceFile));
-
- BufferedReader lines;
- lines = new BufferedReader(new InputStreamReader(in));
- long tasks = 0;
- long size = 0;
- long time = 0;
- float rate = 0;
- float sqrate = 0;
- String line;
- while((line = lines.readLine()) != null) {
- StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
- String attr = tokens.nextToken();
- if (attr.endsWith(":tasks"))
- tasks = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":size"))
- size = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":time"))
- time = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":rate"))
- rate = Float.parseFloat(tokens.nextToken());
- else if (attr.endsWith(":sqrate"))
- sqrate = Float.parseFloat(tokens.nextToken());
- }
-
- double med = rate / 1000 / tasks;
- double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
- String resultLines[] = {
- "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
- (testType == TEST_TYPE_READ) ? "read" :
- "unknown"),
- " Date & time: " + new Date(System.currentTimeMillis()),
- " Number of files: " + tasks,
- "Total MBytes processed: " + size/MEGA,
- " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
- "Average IO rate mb/sec: " + med,
- " IO rate std deviation: " + stdDev,
- " Test exec time sec: " + (float)execTime / 1000,
- "" };
-
- PrintStream res = new PrintStream(
- new FileOutputStream(
- new File(resFileName), true));
- for(int i = 0; i < resultLines.length; i++) {
- LOG.info(resultLines[i]);
- res.println(resultLines[i]);
+ return -1;
}
- }
-
- private static void cleanup(FileSystem fs) throws IOException {
- LOG.info("Cleaning up test files");
- fs.delete(new Path(TEST_ROOT_DIR), true);
+ return 0;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java Fri Mar 4 03:35:06 2011
@@ -23,13 +23,19 @@ import java.io.FileReader;
import java.io.FileWriter;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
/**
* This class tests if a balancer schedules tasks correctly.
*/
-public class TestLoadGenerator extends TestCase {
+public class TestLoadGenerator extends Configured implements Tool {
private static final Configuration CONF = new Configuration();
private static final int DEFAULT_BLOCK_SIZE = 10;
private static final String OUT_DIR =
@@ -53,6 +59,7 @@ public class TestLoadGenerator extends T
}
/** Test if the structure generator works fine */
+ @Test
public void testStructureGenerator() throws Exception {
StructureGenerator sg = new StructureGenerator();
String[] args = new String[]{"-maxDepth", "2", "-minWidth", "1",
@@ -121,6 +128,7 @@ public class TestLoadGenerator extends T
}
/** Test if the load generator works fine */
+ @Test
public void testLoadGenerator() throws Exception {
final String TEST_SPACE_ROOT = "/test";
@@ -209,9 +217,16 @@ public class TestLoadGenerator extends T
/**
* @param args
*/
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) throws Exception{
+ int res = ToolRunner.run(new TestLoadGenerator(), args);
+ System.exit(res);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
TestLoadGenerator loadGeneratorTest = new TestLoadGenerator();
loadGeneratorTest.testStructureGenerator();
loadGeneratorTest.testLoadGenerator();
+ return 0;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java Fri Mar 4 03:35:06 2011
@@ -58,6 +58,9 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
/**
* This program executes a specified operation that applies load to
* the NameNode.
@@ -77,7 +80,7 @@ import org.apache.hadoop.mapred.Reducer;
* must be run before running the other operations.
*/
-public class NNBench {
+public class NNBench extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.NNBench");
@@ -111,14 +114,17 @@ public class NNBench {
static SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S");
- private static Configuration config = new Configuration();
+ // private static Configuration config = new Configuration();
/**
* Clean up the files before a test run
*
* @throws IOException on error
*/
- private static void cleanupBeforeTestrun() throws IOException {
+ private static void cleanupBeforeTestrun(
+ Configuration config
+ ) throws IOException {
+
FileSystem tempFS = FileSystem.get(config);
// Delete the data directory only if it is the create/write operation
@@ -136,7 +142,10 @@ public class NNBench {
*
* @throws IOException on error
*/
- private static void createControlFiles() throws IOException {
+ private static void createControlFiles(
+ Configuration config
+ ) throws IOException {
+
FileSystem tempFS = FileSystem.get(config);
LOG.info("Creating " + numberOfMaps + " control files");
@@ -150,13 +159,10 @@ public class NNBench {
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
LongWritable.class, CompressionType.NONE);
writer.append(new Text(strFileName), new LongWritable(0l));
- } catch(Exception e) {
- throw new IOException(e.getLocalizedMessage());
} finally {
if (writer != null) {
writer.close();
}
- writer = null;
}
}
}
@@ -210,6 +216,9 @@ public class NNBench {
/**
* check for arguments and fail if the values are not specified
+ * @param index positional number of an argument in the list of command
+ * line's arguments
+ * @param length total number of arguments
*/
public static void checkArgs(final int index, final int length) {
if (index == length) {
@@ -220,10 +229,10 @@ public class NNBench {
/**
* Parse input arguments
- *
- * @params args Command line inputs
+ *
+ * @param args array of command line's parameters to be parsed
*/
- public static void parseInputs(final String[] args) {
+ public static void parseInputs(final String[] args, Configuration config) {
// If there are no command line arguments, exit
if (args.length == 0) {
displayUsage();
@@ -307,7 +316,10 @@ public class NNBench {
*
* @throws IOException on error
*/
- private static void analyzeResults() throws IOException {
+ private static void analyzeResults(
+ Configuration config
+ ) throws IOException {
+
final FileSystem fs = FileSystem.get(config);
Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
"part-00000");
@@ -358,8 +370,8 @@ public class NNBench {
// Average latency is the average time to perform 'n' number of
// operations, n being the number of files
- double avgLatency1 = (double) totalTimeAL1 / (double) successfulFileOps;
- double avgLatency2 = (double) totalTimeAL2 / (double) successfulFileOps;
+ double avgLatency1 = (double) totalTimeAL1 / successfulFileOps;
+ double avgLatency2 = (double) totalTimeAL2 / successfulFileOps;
// The time it takes for the longest running map is measured. Using that,
// cluster transactions per second is calculated. It includes time to
@@ -367,7 +379,7 @@ public class NNBench {
double longestMapTimeTPmS = (double) (mapEndTimeTPmS - mapStartTimeTPmS);
double totalTimeTPS = (longestMapTimeTPmS == 0) ?
(1000 * successfulFileOps) :
- (double) (1000 * successfulFileOps) / (double) longestMapTimeTPmS;
+ (double) (1000 * successfulFileOps) / longestMapTimeTPmS;
// The time it takes to perform 'n' operations is calculated (in ms),
// n being the number of files. Using that time, the average execution
@@ -375,22 +387,22 @@ public class NNBench {
// failed operations
double AverageExecutionTime = (totalTimeTPmS == 0) ?
(double) successfulFileOps :
- (double) (totalTimeTPmS / successfulFileOps);
+ (double) totalTimeTPmS / successfulFileOps;
if (operation.equals(OP_CREATE_WRITE)) {
// For create/write/close, it is treated as two transactions,
// since a file create from a client perspective involves create and close
resultTPSLine1 = " TPS: Create/Write/Close: " +
(int) (totalTimeTPS * 2);
- resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " +
- (double) AverageExecutionTime;
+ resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " +
+ AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Create/Write: " + avgLatency1;
resultALLine2 = " Avg Lat (ms): Close: " + avgLatency2;
} else if (operation.equals(OP_OPEN_READ)) {
resultTPSLine1 = " TPS: Open/Read: " +
(int) totalTimeTPS;
resultTPSLine2 = " Avg Exec time (ms): Open/Read: " +
- (double) AverageExecutionTime;
+ AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Open: " + avgLatency1;
if (readFileAfterOpen) {
resultALLine2 = " Avg Lat (ms): Read: " + avgLatency2;
@@ -399,13 +411,13 @@ public class NNBench {
resultTPSLine1 = " TPS: Rename: " +
(int) totalTimeTPS;
resultTPSLine2 = " Avg Exec time (ms): Rename: " +
- (double) AverageExecutionTime;
+ AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Rename: " + avgLatency1;
} else if (operation.equals(OP_DELETE)) {
resultTPSLine1 = " TPS: Delete: " +
(int) totalTimeTPS;
resultTPSLine2 = " Avg Exec time (ms): Delete: " +
- (double) AverageExecutionTime;
+ AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Delete: " + avgLatency1;
}
@@ -458,7 +470,7 @@ public class NNBench {
*
* @throws IOException on error
*/
- public static void runTests() throws IOException {
+ public static void runTests(Configuration config) throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
@@ -558,36 +570,46 @@ public class NNBench {
/**
* Main method for running the NNBench benchmarks
*
+ * @param args array of command line arguments
* @throws IOException indicates a problem with test startup
*/
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new NNBench(), args);
+ System.exit(res);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ final Configuration config = getConf();
// Display the application version string
displayVersion();
// Parse the inputs
- parseInputs(args);
+ parseInputs(args, config);
// Validate inputs
validateInputs();
// Clean up files before the test run
- cleanupBeforeTestrun();
+ cleanupBeforeTestrun(config);
// Create control files before test run
- createControlFiles();
+ createControlFiles(config);
// Run the tests as a map reduce job
- runTests();
+ runTests(config);
// Analyze results
- analyzeResults();
+ analyzeResults(config);
+
+ return 0;
}
/**
* Mapper class
*/
- static class NNBenchMapper extends Configured
+ static class NNBenchMapper extends Configured
implements Mapper<Text, LongWritable, Text, Text> {
FileSystem filesystem = null;
private String hostName = null;
@@ -639,13 +661,15 @@ public class NNBench {
*/
public void close() throws IOException {
}
-
+
/**
- * Returns when the current number of seconds from the epoch equals
- * the command line argument given by <code>-startTime</code>.
- * This allows multiple instances of this program, running on clock
- * synchronized nodes, to start at roughly the same time.
- */
+ * Returns when the current number of seconds from the epoch equals
+ * the command line argument given by <code>-startTime</code>.
+ * This allows multiple instances of this program, running on clock
+ * synchronized nodes, to start at roughly the same time.
+ * @return true if the method was able to sleep for <code>-startTime</code>
+ * without interruption; false otherwise
+ */
private boolean barrier() {
long startTime = getConf().getLong("test.nnbench.starttime", 0l);
long currentTime = System.currentTimeMillis();
@@ -698,16 +722,16 @@ public class NNBench {
if (barrier()) {
if (op.equals(OP_CREATE_WRITE)) {
startTimeTPmS = System.currentTimeMillis();
- doCreateWriteOp("file_" + hostName + "_", output, reporter);
+ doCreateWriteOp("file_" + hostName + "_", reporter);
} else if (op.equals(OP_OPEN_READ)) {
startTimeTPmS = System.currentTimeMillis();
- doOpenReadOp("file_" + hostName + "_", output, reporter);
+ doOpenReadOp("file_" + hostName + "_", reporter);
} else if (op.equals(OP_RENAME)) {
startTimeTPmS = System.currentTimeMillis();
- doRenameOp("file_" + hostName + "_", output, reporter);
+ doRenameOp("file_" + hostName + "_", reporter);
} else if (op.equals(OP_DELETE)) {
startTimeTPmS = System.currentTimeMillis();
- doDeleteOp("file_" + hostName + "_", output, reporter);
+ doDeleteOp("file_" + hostName + "_", reporter);
}
endTimeTPms = System.currentTimeMillis();
@@ -735,11 +759,13 @@ public class NNBench {
/**
* Create and Write operation.
+ * @param name of the prefix of the putput file to be created
+ * @param reporter an instanse of (@link Reporter) to be used for
+ * status' updates
*/
private void doCreateWriteOp(String name,
- OutputCollector<Text, Text> output,
- Reporter reporter) {
- FSDataOutputStream out = null;
+ Reporter reporter) {
+ FSDataOutputStream out;
byte[] buffer = new byte[bytesToWrite];
for (long l = 0l; l < numberOfFiles; l++) {
@@ -783,11 +809,13 @@ public class NNBench {
/**
* Open operation
+ * @param name of the prefix of the putput file to be read
+ * @param reporter an instanse of (@link Reporter) to be used for
+ * status' updates
*/
private void doOpenReadOp(String name,
- OutputCollector<Text, Text> output,
- Reporter reporter) {
- FSDataInputStream input = null;
+ Reporter reporter) {
+ FSDataInputStream input;
byte[] buffer = new byte[bytesToWrite];
for (long l = 0l; l < numberOfFiles; l++) {
@@ -824,10 +852,12 @@ public class NNBench {
/**
* Rename operation
+ * @param name of prefix of the file to be renamed
+ * @param reporter an instanse of (@link Reporter) to be used for
+ * status' updates
*/
private void doRenameOp(String name,
- OutputCollector<Text, Text> output,
- Reporter reporter) {
+ Reporter reporter) {
for (long l = 0l; l < numberOfFiles; l++) {
Path filePath = new Path(new Path(baseDir, dataDirName),
name + "_" + l);
@@ -857,10 +887,12 @@ public class NNBench {
/**
* Delete operation
+ * @param name of prefix of the file to be deleted
+ * @param reporter an instanse of (@link Reporter) to be used for
+ * status' updates
*/
private void doDeleteOp(String name,
- OutputCollector<Text, Text> output,
- Reporter reporter) {
+ Reporter reporter) {
for (long l = 0l; l < numberOfFiles; l++) {
Path filePath = new Path(new Path(baseDir, dataDirName),
name + "_" + l);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java?rev=1077042&r1=1077041&r2=1077042&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java Fri Mar 4 03:35:06 2011
@@ -24,12 +24,16 @@ import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
/**
* This program executes a specified operation that applies load to
@@ -43,7 +47,7 @@ import org.apache.hadoop.util.StringUtil
* This version does not use the map reduce framework
*
*/
-public class NNBenchWithoutMR {
+public class NNBenchWithoutMR extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.NNBench");
@@ -59,7 +63,6 @@ public class NNBenchWithoutMR {
// variables initialized in main()
private static FileSystem fileSys = null;
private static Path taskDir = null;
- private static String uniqueId = null;
private static byte[] buffer;
private static long maxExceptionsPerFile = 200;
@@ -69,12 +72,14 @@ public class NNBenchWithoutMR {
* This allows multiple instances of this program, running on clock
* synchronized nodes, to start at roughly the same time.
*/
+
static void barrier() {
long sleepTime;
while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
+ //This left empty on purpose
}
}
}
@@ -98,18 +103,20 @@ public class NNBenchWithoutMR {
static int createWrite() {
int totalExceptions = 0;
FSDataOutputStream out = null;
- boolean success = false;
+ boolean success;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
do { // create file until is succeeds or max exceptions reached
try {
out = fileSys.create(
- new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock);
+ new Path(taskDir, "" + index), false, 512,
+ (short)1, bytesPerBlock);
success = true;
} catch (IOException ioe) {
success=false;
totalExceptions++;
- handleException("creating file #" + index, ioe, ++singleFileExceptions);
+ handleException("creating file #" + index, ioe,
+ ++singleFileExceptions);
}
} while (!success);
long toBeWritten = bytesPerFile;
@@ -120,7 +127,8 @@ public class NNBenchWithoutMR {
out.write(buffer, 0, nbytes);
} catch (IOException ioe) {
totalExceptions++;
- handleException("writing to file #" + index, ioe, ++singleFileExceptions);
+ handleException("writing to file #" + index, ioe,
+ ++singleFileExceptions);
}
}
do { // close file until is succeeds
@@ -130,7 +138,8 @@ public class NNBenchWithoutMR {
} catch (IOException ioe) {
success=false;
totalExceptions++;
- handleException("closing file #" + index, ioe, ++singleFileExceptions);
+ handleException("closing file #" + index, ioe,
+ ++singleFileExceptions);
}
} while (!success);
}
@@ -144,7 +153,7 @@ public class NNBenchWithoutMR {
*/
static int openRead() {
int totalExceptions = 0;
- FSDataInputStream in = null;
+ FSDataInputStream in;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
try {
@@ -153,11 +162,12 @@ public class NNBenchWithoutMR {
while (toBeRead > 0) {
int nbytes = (int) Math.min(buffer.length, toBeRead);
toBeRead -= nbytes;
- try { // only try once
+ try { // only try once && we don't care about a number of bytes read
in.read(buffer, 0, nbytes);
} catch (IOException ioe) {
totalExceptions++;
- handleException("reading from file #" + index, ioe, ++singleFileExceptions);
+ handleException("reading from file #" + index, ioe,
+ ++singleFileExceptions);
}
}
in.close();
@@ -177,19 +187,23 @@ public class NNBenchWithoutMR {
*/
static int rename() {
int totalExceptions = 0;
- boolean success = false;
+ boolean success;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
do { // rename file until is succeeds
try {
- boolean result = fileSys.rename(
- new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
+ // Possible result of this operation is at no interest to us for it
+ // can return false only if the namesystem
+ // could rename the path from the name
+ // space (e.g. no Exception has been thrown)
+ fileSys.rename(new Path(taskDir, "" + index),
+ new Path(taskDir, "A" + index));
success = true;
- } catch (IOException ioe) {
- success=false;
+ } catch (IOException ioe) {
+ success = false;
totalExceptions++;
handleException("creating file #" + index, ioe, ++singleFileExceptions);
- }
+ }
} while (!success);
}
return totalExceptions;
@@ -203,14 +217,18 @@ public class NNBenchWithoutMR {
*/
static int delete() {
int totalExceptions = 0;
- boolean success = false;
+ boolean success;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
do { // delete file until is succeeds
try {
- boolean result = fileSys.delete(new Path(taskDir, "A" + index), true);
+ // Possible result of this operation is at no interest to us for it
+ // can return false only if namesystem
+ // delete could remove the path from the name
+ // space (e.g. no Exception has been thrown)
+ fileSys.delete(new Path(taskDir, "A" + index), true);
success = true;
- } catch (IOException ioe) {
+ } catch (IOException ioe) {
success=false;
totalExceptions++;
handleException("creating file #" + index, ioe, ++singleFileExceptions);
@@ -239,15 +257,23 @@ public class NNBenchWithoutMR {
* [-bytesPerChecksum <value for io.bytes.per.checksum>]
* </pre>
*
+ * @param args is an array of the program command line arguments
* @throws IOException indicates a problem with test startup
*/
- public static void main(String[] args) throws IOException {
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new NNBenchWithoutMR(), args);
+ System.exit(res);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
String version = "NameNodeBenchmark.0.3";
System.out.println(version);
int bytesPerChecksum = -1;
String usage =
- "Usage: nnbench " +
+ "Usage: NNBenchWithoutMR " +
" -operation <one of createWrite, openRead, rename, or delete> " +
" -baseDir <base output/input DFS path> " +
" -startTime <time to start, given in seconds from the epoch> " +
@@ -275,13 +301,13 @@ public class NNBenchWithoutMR {
operation = args[++i];
} else {
System.out.println(usage);
- System.exit(-1);
+ return -1;
}
}
bytesPerFile = bytesPerBlock * blocksPerFile;
JobConf jobConf = new JobConf(new Configuration(), NNBench.class);
-
+
if ( bytesPerChecksum < 0 ) { // if it is not set in cmdline
bytesPerChecksum = jobConf.getInt("io.bytes.per.checksum", 512);
}
@@ -304,11 +330,11 @@ public class NNBenchWithoutMR {
bytesPerBlock % bytesPerChecksum != 0)
{
System.err.println(usage);
- System.exit(-1);
+ return -1;
}
fileSys = FileSystem.get(jobConf);
- uniqueId = java.net.InetAddress.getLocalHost().getHostName();
+ String uniqueId = java.net.InetAddress.getLocalHost().getHostName();
taskDir = new Path(baseDir, uniqueId);
// initialize buffer used for writing/reading file
buffer = new byte[(int) Math.min(bytesPerFile, 32768L)];
@@ -333,12 +359,14 @@ public class NNBenchWithoutMR {
exceptions = delete();
} else {
System.err.println(usage);
- System.exit(-1);
+ return -1;
}
endTime = new Date();
System.out.println("Job ended: " + endTime);
duration = (endTime.getTime() - execTime.getTime()) /1000;
System.out.println("The " + operation + " job took " + duration + " seconds.");
System.out.println("The job recorded " + exceptions + " exceptions.");
+
+ return 0;
}
}