You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:35:01 UTC
svn commit: r1077041 - in
/hadoop/common/branches/branch-0.20-security-patches: ivy/
src/test/org/apache/hadoop/fs/ src/test/org/apache/hadoop/fs/loadGenerator/
src/test/org/apache/hadoop/hdfs/
Author: omalley
Date: Fri Mar 4 03:35:01 2011
New Revision: 1077041
URL: http://svn.apache.org/viewvc?rev=1077041&view=rev
Log:
commit b76cad571ab4d96d62e39eda20e37d86984a8588
Author: Erik Steffl <st...@yahoo-inc.com>
Date: Wed Oct 28 12:30:14 2009 -0700
Revert "HDFS:587 from https://issues.apache.org/jira/secure/attachment/12421243/jira.HDFS-587.branch-0.20-internal.patch"
This reverts commit 1f26e0aa1d0352345ab73e62f3eac2d8c4748cc5.
Conflicts:
YAHOO-CHANGES.txt
Resolved YAHOO-CHANGES.txt conflict by adding YAHOO-CHANGES.txt from HEAD
and removing HDFS-587 comment from it
+++ b/YAHOO-CHANGES.txt
Modified:
hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java
Modified: hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties (original)
+++ hadoop/common/branches/branch-0.20-security-patches/ivy/libraries.properties Fri Mar 4 03:35:01 2011
@@ -48,7 +48,7 @@ jsp-api.version=5.5.12
jets3t.version=0.6.1
jetty.version=6.1.14
jetty-util.version=6.1.14
-junit.version=4.5
+junit.version=3.8.1
jdiff.version=1.0.9
json.version=1.0
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Fri Mar 4 03:35:01 2011
@@ -22,8 +22,12 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
/**
* Reducer that accumulates values based on their type.
@@ -41,12 +45,8 @@ import org.apache.hadoop.mapred.*;
* </ul>
*
*/
-@SuppressWarnings("deprecation")
public class AccumulatingReducer extends MapReduceBase
- implements Reducer<Text, Text, Text, Text> {
- static final String VALUE_TYPE_LONG = "l:";
- static final String VALUE_TYPE_FLOAT = "f:";
- static final String VALUE_TYPE_STRING = "s:";
+ implements Reducer<UTF8, UTF8, UTF8, UTF8> {
private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class);
protected String hostName;
@@ -61,9 +61,9 @@ public class AccumulatingReducer extends
LOG.info("Starting AccumulatingReducer on " + hostName);
}
- public void reduce(Text key,
- Iterator<Text> values,
- OutputCollector<Text, Text> output,
+ public void reduce(UTF8 key,
+ Iterator<UTF8> values,
+ OutputCollector<UTF8, UTF8> output,
Reporter reporter
) throws IOException {
String field = key.toString();
@@ -71,30 +71,30 @@ public class AccumulatingReducer extends
reporter.setStatus("starting " + field + " ::host = " + hostName);
// concatenate strings
- if (field.startsWith(VALUE_TYPE_STRING)) {
- StringBuffer sSum = new StringBuffer();
+ if (field.startsWith("s:")) {
+ String sSum = "";
while (values.hasNext())
- sSum.append(values.next().toString()).append(";");
- output.collect(key, new Text(sSum.toString()));
+ sSum += values.next().toString() + ";";
+ output.collect(key, new UTF8(sSum));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith(VALUE_TYPE_FLOAT)) {
+ if (field.startsWith("f:")) {
float fSum = 0;
while (values.hasNext())
fSum += Float.parseFloat(values.next().toString());
- output.collect(key, new Text(String.valueOf(fSum)));
+ output.collect(key, new UTF8(String.valueOf(fSum)));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith(VALUE_TYPE_LONG)) {
+ if (field.startsWith("l:")) {
long lSum = 0;
while (values.hasNext()) {
lSum += Long.parseLong(values.next().toString());
}
- output.collect(key, new Text(String.valueOf(lSum)));
+ output.collect(key, new UTF8(String.valueOf(lSum)));
}
reporter.setStatus("finished " + field + " ::host = " + hostName);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DFSCIOTest.java Fri Mar 4 03:35:01 2011
@@ -18,28 +18,18 @@
package org.apache.hadoop.fs;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintStream;
+import java.io.*;
+
+import junit.framework.TestCase;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.commons.logging.*;
+
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.conf.*;
/**
* Distributed i/o benchmark.
@@ -68,9 +58,8 @@ import org.junit.Test;
* <li>standard i/o rate deviation</li>
* </ul>
*/
-public class DFSCIOTest extends Configured implements Tool {
+public class DFSCIOTest extends TestCase {
// Constants
- private static final Log LOG = LogFactory.getLog(DFSCIOTest.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
@@ -78,6 +67,7 @@ public class DFSCIOTest extends Configur
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log";
+ private static final Log LOG = FileInputFormat.LOG;
private static Configuration fsConfig = new Configuration();
private static final long MEGA = 0x100000;
private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest");
@@ -98,7 +88,6 @@ public class DFSCIOTest extends Configur
*
* @throws Exception
*/
- @Test
public void testIOs() throws Exception {
testIOs(10, 10);
}
@@ -135,9 +124,9 @@ public class DFSCIOTest extends Configur
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- Text.class, LongWritable.class,
+ UTF8.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new Text(name), new LongWritable(fileSize));
+ writer.append(new UTF8(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -165,30 +154,26 @@ public class DFSCIOTest extends Configur
* <li>i/o rate squared</li>
* </ul>
*/
- private abstract static class IOStatMapper extends IOMapperBase<Long> {
+ private abstract static class IOStatMapper extends IOMapperBase {
IOStatMapper() {
+ super(fsConfig);
}
- void collectStats(OutputCollector<Text, Text> output,
+ void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
- Long objSize) throws IOException {
- long totalSize = objSize.longValue();
+ Object objSize) throws IOException {
+ long totalSize = ((Long)objSize).longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
- new Text(String.valueOf(1)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
- new Text(String.valueOf(totalSize)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
- new Text(String.valueOf(execTime)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
- new Text(String.valueOf(ioRateMbSec*1000)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
- new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
+ output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
+ output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
+ output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
@@ -203,7 +188,7 @@ public class DFSCIOTest extends Configur
buffer[i] = (byte)('0' + i % 50);
}
- public Long doIO(Reporter reporter,
+ public Object doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -289,8 +274,8 @@ public class DFSCIOTest extends Configur
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(UTF8.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@@ -304,7 +289,7 @@ public class DFSCIOTest extends Configur
super();
}
- public Long doIO(Reporter reporter,
+ public Object doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -399,79 +384,7 @@ public class DFSCIOTest extends Configur
MEGA*fileSize);
}
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new TestDFSIO(), args);
- System.exit(res);
- }
-
- private static void analyzeResult( FileSystem fs,
- int testType,
- long execTime,
- String resFileName
- ) throws IOException {
- Path reduceFile;
- if (testType == TEST_TYPE_WRITE)
- reduceFile = new Path(WRITE_DIR, "part-00000");
- else
- reduceFile = new Path(READ_DIR, "part-00000");
- DataInputStream in;
- in = new DataInputStream(fs.open(reduceFile));
-
- BufferedReader lines;
- lines = new BufferedReader(new InputStreamReader(in));
- long tasks = 0;
- long size = 0;
- long time = 0;
- float rate = 0;
- float sqrate = 0;
- String line;
- while((line = lines.readLine()) != null) {
- StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
- String attr = tokens.nextToken();
- if (attr.endsWith(":tasks"))
- tasks = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":size"))
- size = Long.parseLong(tokens. nextToken());
- else if (attr.endsWith(":time"))
- time = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":rate"))
- rate = Float.parseFloat(tokens.nextToken());
- else if (attr.endsWith(":sqrate"))
- sqrate = Float.parseFloat(tokens.nextToken());
- }
-
- double med = rate / 1000 / tasks;
- double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
- String resultLines[] = {
- "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
- (testType == TEST_TYPE_READ) ? "read" :
- "unknown"),
- " Date & time: " + new Date(System.currentTimeMillis()),
- " Number of files: " + tasks,
- "Total MBytes processed: " + size/MEGA,
- " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
- "Average IO rate mb/sec: " + med,
- " Std IO rate deviation: " + stdDev,
- " Test exec time sec: " + (float)execTime / 1000,
- "" };
-
- PrintStream res = new PrintStream(
- new FileOutputStream(
- new File(resFileName), true));
- for(int i = 0; i < resultLines.length; i++) {
- LOG.info(resultLines[i]);
- res.println(resultLines[i]);
- }
- }
-
- private static void cleanup(FileSystem fs) throws Exception {
- LOG.info("Cleaning up test files");
- fs.delete(new Path(TEST_ROOT_DIR), true);
- fs.delete(HDFS_TEST_DIR, true);
- }
-
- @Override
- public int run(String[] args) throws Exception {
+ public static void main(String[] args) {
int testType = TEST_TYPE_READ;
int bufferSize = DEFAULT_BUFFER_SIZE;
int fileSize = 1;
@@ -536,11 +449,11 @@ public class DFSCIOTest extends Configur
long execTime = System.currentTimeMillis() - tStart;
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
LOG.info(resultLine);
- return 0;
+ return;
}
if (testType == TEST_TYPE_CLEANUP) {
cleanup(fs);
- return 0;
+ return;
}
createControlFile(fs, fileSize, nrFiles);
long tStart = System.currentTimeMillis();
@@ -553,8 +466,73 @@ public class DFSCIOTest extends Configur
analyzeResult(fs, testType, execTime, resFileName);
} catch(Exception e) {
System.err.print(e.getLocalizedMessage());
- return -1;
+ System.exit(-1);
+ }
+ }
+
+ private static void analyzeResult( FileSystem fs,
+ int testType,
+ long execTime,
+ String resFileName
+ ) throws IOException {
+ Path reduceFile;
+ if (testType == TEST_TYPE_WRITE)
+ reduceFile = new Path(WRITE_DIR, "part-00000");
+ else
+ reduceFile = new Path(READ_DIR, "part-00000");
+ DataInputStream in;
+ in = new DataInputStream(fs.open(reduceFile));
+
+ BufferedReader lines;
+ lines = new BufferedReader(new InputStreamReader(in));
+ long tasks = 0;
+ long size = 0;
+ long time = 0;
+ float rate = 0;
+ float sqrate = 0;
+ String line;
+ while((line = lines.readLine()) != null) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+ String attr = tokens.nextToken();
+ if (attr.endsWith(":tasks"))
+ tasks = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":size"))
+ size = Long.parseLong(tokens. nextToken());
+ else if (attr.endsWith(":time"))
+ time = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":rate"))
+ rate = Float.parseFloat(tokens.nextToken());
+ else if (attr.endsWith(":sqrate"))
+ sqrate = Float.parseFloat(tokens.nextToken());
+ }
+
+ double med = rate / 1000 / tasks;
+ double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
+ String resultLines[] = {
+ "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
+ (testType == TEST_TYPE_READ) ? "read" :
+ "unknown"),
+ " Date & time: " + new Date(System.currentTimeMillis()),
+ " Number of files: " + tasks,
+ "Total MBytes processed: " + size/MEGA,
+ " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
+ "Average IO rate mb/sec: " + med,
+ " Std IO rate deviation: " + stdDev,
+ " Test exec time sec: " + (float)execTime / 1000,
+ "" };
+
+ PrintStream res = new PrintStream(
+ new FileOutputStream(
+ new File(resFileName), true));
+ for(int i = 0; i < resultLines.length; i++) {
+ LOG.info(resultLines[i]);
+ res.println(resultLines[i]);
}
- return 0;
+ }
+
+ private static void cleanup(FileSystem fs) throws Exception {
+ LOG.info("Cleaning up test files");
+ fs.delete(new Path(TEST_ROOT_DIR), true);
+ fs.delete(HDFS_TEST_DIR, true);
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Fri Mar 4 03:35:01 2011
@@ -18,31 +18,20 @@
package org.apache.hadoop.fs;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintStream;
+import java.io.*;
+
+import junit.framework.TestCase;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.Vector;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.commons.logging.*;
+
import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.conf.*;
/**
* Distributed checkup of the file system consistency.
@@ -54,9 +43,8 @@ import org.junit.Test;
* Optionally displays statistics on read performance.
*
*/
-public class DistributedFSCheck extends Configured implements Tool {
+public class DistributedFSCheck extends TestCase {
// Constants
- private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_CLEANUP = 2;
private static final int DEFAULT_BUFFER_SIZE = 1000000;
@@ -64,6 +52,7 @@ public class DistributedFSCheck extends
private static final long MEGA = 0x100000;
private static Configuration fsConfig = new Configuration();
+ private static final Log LOG = FileInputFormat.LOG;
private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
@@ -81,7 +70,6 @@ public class DistributedFSCheck extends
*
* @throws Exception
*/
- @Test
public void testFSBlocks() throws Exception {
testFSBlocks("/");
}
@@ -104,7 +92,7 @@ public class DistributedFSCheck extends
Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, fsConfig, inputFile,
- Text.class, LongWritable.class, CompressionType.NONE);
+ UTF8.class, LongWritable.class, CompressionType.NONE);
try {
nrFiles = 0;
@@ -118,41 +106,30 @@ public class DistributedFSCheck extends
private void listSubtree(Path rootFile,
SequenceFile.Writer writer
) throws IOException {
- FileStatus rootStatus = fs.getFileStatus(rootFile);
- listSubtree(rootStatus, writer);
- }
-
- private void listSubtree(FileStatus rootStatus,
- SequenceFile.Writer writer
- ) throws IOException {
- Path rootFile = rootStatus.getPath();
- if (!rootStatus.isDir()) {
+ if (!fs.isDirectory(rootFile)) {
nrFiles++;
// For a regular file generate <fName,offset> pairs
long blockSize = fs.getDefaultBlockSize();
- long fileLength = rootStatus.getLen();
+ long fileLength = fs.getLength(rootFile);
for(long offset = 0; offset < fileLength; offset += blockSize)
- writer.append(new Text(rootFile.toString()), new LongWritable(offset));
+ writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
return;
}
- FileStatus [] children = null;
- try {
- children = fs.listStatus(rootFile);
- } catch (FileNotFoundException fnfe ){
+ FileStatus children[] = fs.listStatus(rootFile);
+ if (children == null)
throw new IOException("Could not get listing for " + rootFile);
- }
-
for (int i = 0; i < children.length; i++)
- listSubtree(children[i], writer);
+ listSubtree(children[i].getPath(), writer);
}
/**
* DistributedFSCheck mapper class.
*/
- public static class DistributedFSCheckMapper extends IOMapperBase<Object> {
+ public static class DistributedFSCheckMapper extends IOMapperBase {
public DistributedFSCheckMapper() {
+ super(fsConfig);
}
public Object doIO(Reporter reporter,
@@ -186,17 +163,14 @@ public class DistributedFSCheck extends
return new Long(actualSize);
}
- void collectStats(OutputCollector<Text, Text> output,
+ void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
Object corruptedBlock) throws IOException {
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"),
- new Text(String.valueOf(1)));
+ output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
if (corruptedBlock.getClass().getName().endsWith("String")) {
- output.collect(
- new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"),
- new Text((String)corruptedBlock));
+ output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
return;
}
long totalSize = ((Long)corruptedBlock).longValue();
@@ -205,12 +179,9 @@ public class DistributedFSCheck extends
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
- new Text(String.valueOf(totalSize)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
- new Text(String.valueOf(execTime)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
- new Text(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
+ output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
+ output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
}
}
@@ -224,17 +195,59 @@ public class DistributedFSCheck extends
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(UTF8.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
- public static void main(String[] args) throws Exception{
- int res = ToolRunner.run(new TestDFSIO(), args);
- System.exit(res);
- }
+ public static void main(String[] args) throws Exception {
+ int testType = TEST_TYPE_READ;
+ int bufferSize = DEFAULT_BUFFER_SIZE;
+ String resFileName = DEFAULT_RES_FILE_NAME;
+ String rootName = "/";
+ boolean viewStats = false;
+
+ String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
+
+ if (args.length == 1 && args[0].startsWith("-h")) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+ for(int i = 0; i < args.length; i++) { // parse command line
+ if (args[i].equals("-root")) {
+ rootName = args[++i];
+ } else if (args[i].startsWith("-clean")) {
+ testType = TEST_TYPE_CLEANUP;
+ } else if (args[i].equals("-bufferSize")) {
+ bufferSize = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-resFile")) {
+ resFileName = args[++i];
+ } else if (args[i].startsWith("-stat")) {
+ viewStats = true;
+ }
+ }
+ LOG.info("root = " + rootName);
+ LOG.info("bufferSize = " + bufferSize);
+
+ Configuration conf = new Configuration();
+ conf.setInt("test.io.file.buffer.size", bufferSize);
+ DistributedFSCheck test = new DistributedFSCheck(conf);
+
+ if (testType == TEST_TYPE_CLEANUP) {
+ test.cleanup();
+ return;
+ }
+ test.createInputFile(rootName);
+ long tStart = System.currentTimeMillis();
+ test.runDistributedFSCheck();
+ long execTime = System.currentTimeMillis() - tStart;
+
+ test.analyzeResult(execTime, resFileName, viewStats);
+ // test.cleanup(); // clean up after all to restore the system state
+ }
+
private void analyzeResult(long execTime,
String resFileName,
boolean viewStats
@@ -315,53 +328,4 @@ public class DistributedFSCheck extends
LOG.info("Cleaning up test files");
fs.delete(TEST_ROOT_DIR, true);
}
-
- @Override
- public int run(String[] args) throws Exception {
- int testType = TEST_TYPE_READ;
- int bufferSize = DEFAULT_BUFFER_SIZE;
- String resFileName = DEFAULT_RES_FILE_NAME;
- String rootName = "/";
- boolean viewStats = false;
-
- String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
-
- if (args.length == 1 && args[0].startsWith("-h")) {
- System.err.println(usage);
- return -1;
- }
- for(int i = 0; i < args.length; i++) { // parse command line
- if (args[i].equals("-root")) {
- rootName = args[++i];
- } else if (args[i].startsWith("-clean")) {
- testType = TEST_TYPE_CLEANUP;
- } else if (args[i].equals("-bufferSize")) {
- bufferSize = Integer.parseInt(args[++i]);
- } else if (args[i].equals("-resFile")) {
- resFileName = args[++i];
- } else if (args[i].startsWith("-stat")) {
- viewStats = true;
- }
- }
-
- LOG.info("root = " + rootName);
- LOG.info("bufferSize = " + bufferSize);
-
- Configuration conf = new Configuration();
- conf.setInt("test.io.file.buffer.size", bufferSize);
- DistributedFSCheck test = new DistributedFSCheck(conf);
-
- if (testType == TEST_TYPE_CLEANUP) {
- test.cleanup();
- return 0;
- }
- test.createInputFile(rootName);
- long tStart = System.currentTimeMillis();
- test.runDistributedFSCheck();
- long execTime = System.currentTimeMillis() - tStart;
-
- test.analyzeResult(execTime, resFileName, viewStats);
- // test.cleanup(); // clean up after all to restore the system state
- return 0;
- }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/IOMapperBase.java Fri Mar 4 03:35:01 2011
@@ -19,10 +19,16 @@ package org.apache.hadoop.fs;
import java.io.IOException;
import java.net.InetAddress;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
/**
* Base mapper class for IO operations.
@@ -33,20 +39,16 @@ import org.apache.hadoop.mapred.*;
* statistics data to be collected by subsequent reducers.
*
*/
-@SuppressWarnings("deprecation")
-public abstract class IOMapperBase<T> extends Configured
- implements Mapper<Text, LongWritable, Text, Text> {
+public abstract class IOMapperBase extends Configured
+ implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
protected byte[] buffer;
protected int bufferSize;
protected FileSystem fs;
protected String hostName;
- public IOMapperBase() {
- }
-
- public void configure(JobConf conf) {
- setConf(conf);
+ public IOMapperBase(Configuration conf) {
+ super(conf);
try {
fs = FileSystem.get(conf);
} catch (Exception e) {
@@ -61,6 +63,10 @@ public abstract class IOMapperBase<T> ex
}
}
+ public void configure(JobConf job) {
+ setConf(job);
+ }
+
public void close() throws IOException {
}
@@ -74,7 +80,7 @@ public abstract class IOMapperBase<T> ex
* {@link #collectStats(OutputCollector,String,long,Object)}
* @throws IOException
*/
- abstract T doIO(Reporter reporter,
+ abstract Object doIO(Reporter reporter,
String name,
long value) throws IOException;
@@ -87,10 +93,10 @@ public abstract class IOMapperBase<T> ex
* @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
* @throws IOException
*/
- abstract void collectStats(OutputCollector<Text, Text> output,
+ abstract void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
- T doIOReturnValue) throws IOException;
+ Object doIOReturnValue) throws IOException;
/**
* Map file name and offset into statistical data.
@@ -105,9 +111,9 @@ public abstract class IOMapperBase<T> ex
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
- public void map(Text key,
+ public void map(UTF8 key,
LongWritable value,
- OutputCollector<Text, Text> output,
+ OutputCollector<UTF8, UTF8> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
@@ -115,7 +121,7 @@ public abstract class IOMapperBase<T> ex
reporter.setStatus("starting " + name + " ::host = " + hostName);
long tStart = System.currentTimeMillis();
- T statValue = doIO(reporter, name, longValue);
+ Object statValue = doIO(reporter, name, longValue);
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/TestDFSIO.java Fri Mar 4 03:35:01 2011
@@ -18,30 +18,19 @@
package org.apache.hadoop.fs;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
+import java.io.*;
+
+import junit.framework.TestCase;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.commons.logging.*;
+
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.conf.*;
/**
* Distributed i/o benchmark.
@@ -70,9 +59,8 @@ import org.junit.Test;
* <li>standard deviation of i/o rate </li>
* </ul>
*/
-public class TestDFSIO extends Configured implements Tool {
+public class TestDFSIO extends TestCase {
// Constants
- private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
@@ -80,6 +68,8 @@ public class TestDFSIO extends Configure
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
+ private static final Log LOG = FileInputFormat.LOG;
+ private static Configuration fsConfig = new Configuration();
private static final long MEGA = 0x100000;
private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
@@ -87,19 +77,13 @@ public class TestDFSIO extends Configure
private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
- static{
- Configuration.addDefaultResource("hdfs-default.xml");
- Configuration.addDefaultResource("hdfs-site.xml");
- }
-
/**
* Run the test with default parameters.
*
* @throws Exception
*/
- @Test
public void testIOs() throws Exception {
- testIOs(10, 10, new Configuration());
+ testIOs(10, 10);
}
/**
@@ -109,21 +93,21 @@ public class TestDFSIO extends Configure
* @param nrFiles number of files
* @throws IOException
*/
- public static void testIOs(int fileSize, int nrFiles, Configuration fsConfig)
+ public static void testIOs(int fileSize, int nrFiles)
throws IOException {
FileSystem fs = FileSystem.get(fsConfig);
- createControlFile(fs, fileSize, nrFiles, fsConfig);
- writeTest(fs, fsConfig);
- readTest(fs, fsConfig);
+ createControlFile(fs, fileSize, nrFiles);
+ writeTest(fs);
+ readTest(fs);
cleanup(fs);
}
- private static void createControlFile(FileSystem fs,
+ private static void createControlFile(
+ FileSystem fs,
int fileSize, // in MB
- int nrFiles,
- Configuration fsConfig
+ int nrFiles
) throws IOException {
LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
@@ -135,9 +119,9 @@ public class TestDFSIO extends Configure
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- Text.class, LongWritable.class,
+ UTF8.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new Text(name), new LongWritable(fileSize));
+ writer.append(new UTF8(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -165,44 +149,41 @@ public class TestDFSIO extends Configure
* <li>i/o rate squared</li>
* </ul>
*/
- private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
+ private abstract static class IOStatMapper extends IOMapperBase {
IOStatMapper() {
+ super(fsConfig);
}
- void collectStats(OutputCollector<Text, Text> output,
+ void collectStats(OutputCollector<UTF8, UTF8> output,
String name,
long execTime,
- Long objSize) throws IOException {
- long totalSize = objSize.longValue();
+ Object objSize) throws IOException {
+ long totalSize = ((Long)objSize).longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
- new Text(String.valueOf(1)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
- new Text(String.valueOf(totalSize)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
- new Text(String.valueOf(execTime)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
- new Text(String.valueOf(ioRateMbSec*1000)));
- output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
- new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
+ output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
+ output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
+ output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
/**
* Write mapper class.
*/
- public static class WriteMapper extends IOStatMapper<Long> {
+ public static class WriteMapper extends IOStatMapper {
public WriteMapper() {
+ super();
for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50);
}
- public Long doIO(Reporter reporter,
+ public Object doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -224,24 +205,22 @@ public class TestDFSIO extends Configure
} finally {
out.close();
}
- return Long.valueOf(totalSize);
+ return new Long(totalSize);
}
}
- private static void writeTest(FileSystem fs, Configuration fsConfig)
- throws IOException {
+ private static void writeTest(FileSystem fs)
+ throws IOException {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
- runIOTest(WriteMapper.class, WRITE_DIR, fsConfig);
+ runIOTest(WriteMapper.class, WRITE_DIR);
}
- @SuppressWarnings("deprecation")
- private static void runIOTest(
- Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
- Path outputDir,
- Configuration fsConfig) throws IOException {
+ private static void runIOTest( Class<? extends Mapper> mapperClass,
+ Path outputDir
+ ) throws IOException {
JobConf job = new JobConf(fsConfig, TestDFSIO.class);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
@@ -251,8 +230,8 @@ public class TestDFSIO extends Configure
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(UTF8.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@@ -260,12 +239,13 @@ public class TestDFSIO extends Configure
/**
* Read mapper class.
*/
- public static class ReadMapper extends IOStatMapper<Long> {
+ public static class ReadMapper extends IOStatMapper {
public ReadMapper() {
+ super();
}
- public Long doIO(Reporter reporter,
+ public Object doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -284,22 +264,22 @@ public class TestDFSIO extends Configure
} finally {
in.close();
}
- return Long.valueOf(totalSize);
+ return new Long(totalSize);
}
}
- private static void readTest(FileSystem fs, Configuration fsConfig)
- throws IOException {
+ private static void readTest(FileSystem fs) throws IOException {
fs.delete(READ_DIR, true);
- runIOTest(ReadMapper.class, READ_DIR, fsConfig);
+ runIOTest(ReadMapper.class, READ_DIR);
}
- private static void sequentialTest(FileSystem fs,
+ private static void sequentialTest(
+ FileSystem fs,
int testType,
int fileSize,
int nrFiles
) throws Exception {
- IOStatMapper<Long> ioer = null;
+ IOStatMapper ioer = null;
if (testType == TEST_TYPE_READ)
ioer = new ReadMapper();
else if (testType == TEST_TYPE_WRITE)
@@ -312,102 +292,21 @@ public class TestDFSIO extends Configure
MEGA*fileSize);
}
- public static void main(String[] args) throws Exception{
- int res = ToolRunner.run(new TestDFSIO(), args);
- System.exit(res);
- }
-
- private static void analyzeResult( FileSystem fs,
- int testType,
- long execTime,
- String resFileName
- ) throws IOException {
- Path reduceFile;
- if (testType == TEST_TYPE_WRITE)
- reduceFile = new Path(WRITE_DIR, "part-00000");
- else
- reduceFile = new Path(READ_DIR, "part-00000");
- long tasks = 0;
- long size = 0;
- long time = 0;
- float rate = 0;
- float sqrate = 0;
- DataInputStream in = null;
- BufferedReader lines = null;
- try {
- in = new DataInputStream(fs.open(reduceFile));
- lines = new BufferedReader(new InputStreamReader(in));
- String line;
- while((line = lines.readLine()) != null) {
- StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
- String attr = tokens.nextToken();
- if (attr.endsWith(":tasks"))
- tasks = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":size"))
- size = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":time"))
- time = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":rate"))
- rate = Float.parseFloat(tokens.nextToken());
- else if (attr.endsWith(":sqrate"))
- sqrate = Float.parseFloat(tokens.nextToken());
- }
- } finally {
- if(in != null) in.close();
- if(lines != null) lines.close();
- }
-
- double med = rate / 1000 / tasks;
- double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
- String resultLines[] = {
- "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
- (testType == TEST_TYPE_READ) ? "read" :
- "unknown"),
- " Date & time: " + new Date(System.currentTimeMillis()),
- " Number of files: " + tasks,
- "Total MBytes processed: " + size/MEGA,
- " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
- "Average IO rate mb/sec: " + med,
- " IO rate std deviation: " + stdDev,
- " Test exec time sec: " + (float)execTime / 1000,
- "" };
-
- PrintStream res = null;
- try {
- res = new PrintStream(new FileOutputStream(new File(resFileName), true));
- for(int i = 0; i < resultLines.length; i++) {
- LOG.info(resultLines[i]);
- res.println(resultLines[i]);
- }
- } finally {
- if(res != null) res.close();
- }
- }
-
- private static void cleanup(FileSystem fs) throws IOException {
- LOG.info("Cleaning up test files");
- fs.delete(new Path(TEST_ROOT_DIR), true);
- }
-
- @Override
- public int run(String[] args) throws Exception {
+ public static void main(String[] args) {
int testType = TEST_TYPE_READ;
int bufferSize = DEFAULT_BUFFER_SIZE;
int fileSize = 1;
int nrFiles = 1;
String resFileName = DEFAULT_RES_FILE_NAME;
boolean isSequential = false;
-
- String className = TestDFSIO.class.getSimpleName();
- String version = className + ".0.0.4";
- String usage = "Usage: " + className + " -read | -write | -clean " +
- "[-nrFiles N] [-fileSize MB] [-resFile resultFileName] " +
- "[-bufferSize Bytes] ";
+
+ String version="TestFDSIO.0.0.4";
+ String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
System.out.println(version);
if (args.length == 0) {
System.err.println(usage);
- return -1;
+ System.exit(-1);
}
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].startsWith("-read")) {
@@ -434,7 +333,6 @@ public class TestDFSIO extends Configure
LOG.info("bufferSize = " + bufferSize);
try {
- Configuration fsConfig = new Configuration(getConf());
fsConfig.setInt("test.io.file.buffer.size", bufferSize);
FileSystem fs = FileSystem.get(fsConfig);
@@ -444,25 +342,89 @@ public class TestDFSIO extends Configure
long execTime = System.currentTimeMillis() - tStart;
String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
LOG.info(resultLine);
- return 0;
+ return;
}
if (testType == TEST_TYPE_CLEANUP) {
cleanup(fs);
- return 0;
+ return;
}
- createControlFile(fs, fileSize, nrFiles, fsConfig);
+ createControlFile(fs, fileSize, nrFiles);
long tStart = System.currentTimeMillis();
if (testType == TEST_TYPE_WRITE)
- writeTest(fs, fsConfig);
+ writeTest(fs);
if (testType == TEST_TYPE_READ)
- readTest(fs, fsConfig);
+ readTest(fs);
long execTime = System.currentTimeMillis() - tStart;
analyzeResult(fs, testType, execTime, resFileName);
} catch(Exception e) {
System.err.print(StringUtils.stringifyException(e));
- return -1;
+ System.exit(-1);
+ }
+ }
+
+ private static void analyzeResult( FileSystem fs,
+ int testType,
+ long execTime,
+ String resFileName
+ ) throws IOException {
+ Path reduceFile;
+ if (testType == TEST_TYPE_WRITE)
+ reduceFile = new Path(WRITE_DIR, "part-00000");
+ else
+ reduceFile = new Path(READ_DIR, "part-00000");
+ DataInputStream in;
+ in = new DataInputStream(fs.open(reduceFile));
+
+ BufferedReader lines;
+ lines = new BufferedReader(new InputStreamReader(in));
+ long tasks = 0;
+ long size = 0;
+ long time = 0;
+ float rate = 0;
+ float sqrate = 0;
+ String line;
+ while((line = lines.readLine()) != null) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+ String attr = tokens.nextToken();
+ if (attr.endsWith(":tasks"))
+ tasks = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":size"))
+ size = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":time"))
+ time = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":rate"))
+ rate = Float.parseFloat(tokens.nextToken());
+ else if (attr.endsWith(":sqrate"))
+ sqrate = Float.parseFloat(tokens.nextToken());
+ }
+
+ double med = rate / 1000 / tasks;
+ double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
+ String resultLines[] = {
+ "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
+ (testType == TEST_TYPE_READ) ? "read" :
+ "unknown"),
+ " Date & time: " + new Date(System.currentTimeMillis()),
+ " Number of files: " + tasks,
+ "Total MBytes processed: " + size/MEGA,
+ " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
+ "Average IO rate mb/sec: " + med,
+ " IO rate std deviation: " + stdDev,
+ " Test exec time sec: " + (float)execTime / 1000,
+ "" };
+
+ PrintStream res = new PrintStream(
+ new FileOutputStream(
+ new File(resFileName), true));
+ for(int i = 0; i < resultLines.length; i++) {
+ LOG.info(resultLines[i]);
+ res.println(resultLines[i]);
}
- return 0;
+ }
+
+ private static void cleanup(FileSystem fs) throws IOException {
+ LOG.info("Cleaning up test files");
+ fs.delete(new Path(TEST_ROOT_DIR), true);
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/fs/loadGenerator/TestLoadGenerator.java Fri Mar 4 03:35:01 2011
@@ -23,19 +23,13 @@ import java.io.FileReader;
import java.io.FileWriter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import static org.junit.Assert.*;
-
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Test;
-
+import junit.framework.TestCase;
/**
* This class tests if a balancer schedules tasks correctly.
*/
-public class TestLoadGenerator extends Configured implements Tool {
+public class TestLoadGenerator extends TestCase {
private static final Configuration CONF = new Configuration();
private static final int DEFAULT_BLOCK_SIZE = 10;
private static final String OUT_DIR =
@@ -59,7 +53,6 @@ public class TestLoadGenerator extends C
}
/** Test if the structure generator works fine */
- @Test
public void testStructureGenerator() throws Exception {
StructureGenerator sg = new StructureGenerator();
String[] args = new String[]{"-maxDepth", "2", "-minWidth", "1",
@@ -128,17 +121,9 @@ public class TestLoadGenerator extends C
}
/** Test if the load generator works fine */
- @Test
public void testLoadGenerator() throws Exception {
final String TEST_SPACE_ROOT = "/test";
- final String SCRIPT_TEST_DIR = new File(System.getProperty("test.build.data",
- "/tmp")).getAbsolutePath();
- String script = SCRIPT_TEST_DIR + "/" + "loadgenscript";
- String script2 = SCRIPT_TEST_DIR + "/" + "loadgenscript2";
- File scriptFile1 = new File(script);
- File scriptFile2 = new File(script2);
-
FileWriter writer = new FileWriter(DIR_STRUCTURE_FILE);
writer.write(DIR_STRUCTURE_FIRST_LINE+"\n");
writer.write(DIR_STRUCTURE_SECOND_LINE+"\n");
@@ -214,54 +199,19 @@ public class TestLoadGenerator extends C
args[ELAPSED_TIME] = "-1";
assertEquals(-1, lg.run(args));
args[ELAPSED_TIME] = oldArg;
-
- // test scripted operation
- // Test with good script
- FileWriter fw = new FileWriter(scriptFile1);
- fw.write("2 .22 .33\n");
- fw.write("3 .10 .6\n");
- fw.write("6 0 .7\n");
- fw.close();
-
- String[] scriptArgs = new String[] {
- "-root", TEST_SPACE_ROOT, "-maxDelayBetweenOps", "0",
- "-numOfThreads", "10", "-startTime",
- Long.toString(System.currentTimeMillis()), "-scriptFile", script};
-
- assertEquals(0, lg.run(scriptArgs));
-
- // Test with bad script
- fw = new FileWriter(scriptFile2);
- fw.write("2 .22 .33\n");
- fw.write("3 blah blah blah .6\n");
- fw.write("6 0 .7\n");
- fw.close();
-
- scriptArgs[scriptArgs.length - 1] = script2;
- assertEquals(-1, lg.run(scriptArgs));
-
} finally {
cluster.shutdown();
DIR_STRUCTURE_FILE.delete();
FILE_STRUCTURE_FILE.delete();
- scriptFile1.delete();
- scriptFile2.delete();
}
}
/**
* @param args
*/
- public static void main(String[] args) throws Exception{
- int res = ToolRunner.run(new TestLoadGenerator(), args);
- System.exit(res);
- }
-
- @Override
- public int run(String[] args) throws Exception {
+ public static void main(String[] args) throws Exception {
TestLoadGenerator loadGeneratorTest = new TestLoadGenerator();
loadGeneratorTest.testStructureGenerator();
loadGeneratorTest.testLoadGenerator();
- return 0;
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBench.java Fri Mar 4 03:35:01 2011
@@ -58,9 +58,6 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
/**
* This program executes a specified operation that applies load to
* the NameNode.
@@ -80,7 +77,7 @@ import org.apache.hadoop.util.ToolRunner
* must be run before running the other operations.
*/
-public class NNBench extends Configured implements Tool {
+public class NNBench {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.NNBench");
@@ -114,17 +111,14 @@ public class NNBench extends Configured
static SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S");
- // private static Configuration config = new Configuration();
+ private static Configuration config = new Configuration();
/**
* Clean up the files before a test run
*
* @throws IOException on error
*/
- private static void cleanupBeforeTestrun(
- Configuration config
- ) throws IOException {
-
+ private static void cleanupBeforeTestrun() throws IOException {
FileSystem tempFS = FileSystem.get(config);
// Delete the data directory only if it is the create/write operation
@@ -142,10 +136,7 @@ public class NNBench extends Configured
*
* @throws IOException on error
*/
- private static void createControlFiles(
- Configuration config
- ) throws IOException {
-
+ private static void createControlFiles() throws IOException {
FileSystem tempFS = FileSystem.get(config);
LOG.info("Creating " + numberOfMaps + " control files");
@@ -159,10 +150,13 @@ public class NNBench extends Configured
writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class,
LongWritable.class, CompressionType.NONE);
writer.append(new Text(strFileName), new LongWritable(0l));
+ } catch(Exception e) {
+ throw new IOException(e.getLocalizedMessage());
} finally {
if (writer != null) {
writer.close();
}
+ writer = null;
}
}
}
@@ -216,9 +210,6 @@ public class NNBench extends Configured
/**
* check for arguments and fail if the values are not specified
- * @param index positional number of an argument in the list of command
- * line's arguments
- * @param length total number of arguments
*/
public static void checkArgs(final int index, final int length) {
if (index == length) {
@@ -229,10 +220,10 @@ public class NNBench extends Configured
/**
* Parse input arguments
- *
- * @param args array of command line's parameters to be parsed
+ *
+ * @params args Command line inputs
*/
- public static void parseInputs(final String[] args, Configuration config) {
+ public static void parseInputs(final String[] args) {
// If there are no command line arguments, exit
if (args.length == 0) {
displayUsage();
@@ -316,10 +307,7 @@ public class NNBench extends Configured
*
* @throws IOException on error
*/
- private static void analyzeResults(
- Configuration config
- ) throws IOException {
-
+ private static void analyzeResults() throws IOException {
final FileSystem fs = FileSystem.get(config);
Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
"part-00000");
@@ -370,8 +358,8 @@ public class NNBench extends Configured
// Average latency is the average time to perform 'n' number of
// operations, n being the number of files
- double avgLatency1 = (double) totalTimeAL1 / successfulFileOps;
- double avgLatency2 = (double) totalTimeAL2 / successfulFileOps;
+ double avgLatency1 = (double) totalTimeAL1 / (double) successfulFileOps;
+ double avgLatency2 = (double) totalTimeAL2 / (double) successfulFileOps;
// The time it takes for the longest running map is measured. Using that,
// cluster transactions per second is calculated. It includes time to
@@ -379,7 +367,7 @@ public class NNBench extends Configured
double longestMapTimeTPmS = (double) (mapEndTimeTPmS - mapStartTimeTPmS);
double totalTimeTPS = (longestMapTimeTPmS == 0) ?
(1000 * successfulFileOps) :
- (double) (1000 * successfulFileOps) / longestMapTimeTPmS;
+ (double) (1000 * successfulFileOps) / (double) longestMapTimeTPmS;
// The time it takes to perform 'n' operations is calculated (in ms),
// n being the number of files. Using that time, the average execution
@@ -387,22 +375,22 @@ public class NNBench extends Configured
// failed operations
double AverageExecutionTime = (totalTimeTPmS == 0) ?
(double) successfulFileOps :
- (double) totalTimeTPmS / successfulFileOps;
+ (double) (totalTimeTPmS / successfulFileOps);
if (operation.equals(OP_CREATE_WRITE)) {
// For create/write/close, it is treated as two transactions,
// since a file create from a client perspective involves create and close
resultTPSLine1 = " TPS: Create/Write/Close: " +
(int) (totalTimeTPS * 2);
- resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " +
- AverageExecutionTime;
+ resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " +
+ (double) AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Create/Write: " + avgLatency1;
resultALLine2 = " Avg Lat (ms): Close: " + avgLatency2;
} else if (operation.equals(OP_OPEN_READ)) {
resultTPSLine1 = " TPS: Open/Read: " +
(int) totalTimeTPS;
resultTPSLine2 = " Avg Exec time (ms): Open/Read: " +
- AverageExecutionTime;
+ (double) AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Open: " + avgLatency1;
if (readFileAfterOpen) {
resultALLine2 = " Avg Lat (ms): Read: " + avgLatency2;
@@ -411,13 +399,13 @@ public class NNBench extends Configured
resultTPSLine1 = " TPS: Rename: " +
(int) totalTimeTPS;
resultTPSLine2 = " Avg Exec time (ms): Rename: " +
- AverageExecutionTime;
+ (double) AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Rename: " + avgLatency1;
} else if (operation.equals(OP_DELETE)) {
resultTPSLine1 = " TPS: Delete: " +
(int) totalTimeTPS;
resultTPSLine2 = " Avg Exec time (ms): Delete: " +
- AverageExecutionTime;
+ (double) AverageExecutionTime;
resultALLine1 = " Avg Lat (ms): Delete: " + avgLatency1;
}
@@ -470,7 +458,7 @@ public class NNBench extends Configured
*
* @throws IOException on error
*/
- public static void runTests(Configuration config) throws IOException {
+ public static void runTests() throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
@@ -570,46 +558,36 @@ public class NNBench extends Configured
/**
* Main method for running the NNBench benchmarks
*
- * @param args array of command line arguments
* @throws IOException indicates a problem with test startup
*/
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new NNBench(), args);
- System.exit(res);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- final Configuration config = getConf();
+ public static void main(String[] args) throws IOException {
// Display the application version string
displayVersion();
// Parse the inputs
- parseInputs(args, config);
+ parseInputs(args);
// Validate inputs
validateInputs();
// Clean up files before the test run
- cleanupBeforeTestrun(config);
+ cleanupBeforeTestrun();
// Create control files before test run
- createControlFiles(config);
+ createControlFiles();
// Run the tests as a map reduce job
- runTests(config);
+ runTests();
// Analyze results
- analyzeResults(config);
-
- return 0;
+ analyzeResults();
}
/**
* Mapper class
*/
- static class NNBenchMapper extends Configured
+ static class NNBenchMapper extends Configured
implements Mapper<Text, LongWritable, Text, Text> {
FileSystem filesystem = null;
private String hostName = null;
@@ -661,15 +639,13 @@ public class NNBench extends Configured
*/
public void close() throws IOException {
}
-
+
/**
- * Returns when the current number of seconds from the epoch equals
- * the command line argument given by <code>-startTime</code>.
- * This allows multiple instances of this program, running on clock
- * synchronized nodes, to start at roughly the same time.
- * @return true if the method was able to sleep for <code>-startTime</code>
- * without interruption; false otherwise
- */
+ * Returns when the current number of seconds from the epoch equals
+ * the command line argument given by <code>-startTime</code>.
+ * This allows multiple instances of this program, running on clock
+ * synchronized nodes, to start at roughly the same time.
+ */
private boolean barrier() {
long startTime = getConf().getLong("test.nnbench.starttime", 0l);
long currentTime = System.currentTimeMillis();
@@ -722,16 +698,16 @@ public class NNBench extends Configured
if (barrier()) {
if (op.equals(OP_CREATE_WRITE)) {
startTimeTPmS = System.currentTimeMillis();
- doCreateWriteOp("file_" + hostName + "_", reporter);
+ doCreateWriteOp("file_" + hostName + "_", output, reporter);
} else if (op.equals(OP_OPEN_READ)) {
startTimeTPmS = System.currentTimeMillis();
- doOpenReadOp("file_" + hostName + "_", reporter);
+ doOpenReadOp("file_" + hostName + "_", output, reporter);
} else if (op.equals(OP_RENAME)) {
startTimeTPmS = System.currentTimeMillis();
- doRenameOp("file_" + hostName + "_", reporter);
+ doRenameOp("file_" + hostName + "_", output, reporter);
} else if (op.equals(OP_DELETE)) {
startTimeTPmS = System.currentTimeMillis();
- doDeleteOp("file_" + hostName + "_", reporter);
+ doDeleteOp("file_" + hostName + "_", output, reporter);
}
endTimeTPms = System.currentTimeMillis();
@@ -759,13 +735,11 @@ public class NNBench extends Configured
/**
* Create and Write operation.
- * @param name of the prefix of the putput file to be created
- * @param reporter an instanse of (@link Reporter) to be used for
- * status' updates
*/
private void doCreateWriteOp(String name,
- Reporter reporter) {
- FSDataOutputStream out;
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
+ FSDataOutputStream out = null;
byte[] buffer = new byte[bytesToWrite];
for (long l = 0l; l < numberOfFiles; l++) {
@@ -809,13 +783,11 @@ public class NNBench extends Configured
/**
* Open operation
- * @param name of the prefix of the putput file to be read
- * @param reporter an instanse of (@link Reporter) to be used for
- * status' updates
*/
private void doOpenReadOp(String name,
- Reporter reporter) {
- FSDataInputStream input;
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
+ FSDataInputStream input = null;
byte[] buffer = new byte[bytesToWrite];
for (long l = 0l; l < numberOfFiles; l++) {
@@ -852,12 +824,10 @@ public class NNBench extends Configured
/**
* Rename operation
- * @param name of prefix of the file to be renamed
- * @param reporter an instanse of (@link Reporter) to be used for
- * status' updates
*/
private void doRenameOp(String name,
- Reporter reporter) {
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
for (long l = 0l; l < numberOfFiles; l++) {
Path filePath = new Path(new Path(baseDir, dataDirName),
name + "_" + l);
@@ -887,12 +857,10 @@ public class NNBench extends Configured
/**
* Delete operation
- * @param name of prefix of the file to be deleted
- * @param reporter an instanse of (@link Reporter) to be used for
- * status' updates
*/
private void doDeleteOp(String name,
- Reporter reporter) {
+ OutputCollector<Text, Text> output,
+ Reporter reporter) {
for (long l = 0l; l < numberOfFiles; l++) {
Path filePath = new Path(new Path(baseDir, dataDirName),
name + "_" + l);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java?rev=1077041&r1=1077040&r2=1077041&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/hdfs/NNBenchWithoutMR.java Fri Mar 4 03:35:01 2011
@@ -24,16 +24,12 @@ import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.StringUtils;
/**
* This program executes a specified operation that applies load to
@@ -47,7 +43,7 @@ import org.apache.hadoop.util.ToolRunner
* This version does not use the map reduce framework
*
*/
-public class NNBenchWithoutMR extends Configured implements Tool {
+public class NNBenchWithoutMR {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.NNBench");
@@ -63,6 +59,7 @@ public class NNBenchWithoutMR extends Co
// variables initialized in main()
private static FileSystem fileSys = null;
private static Path taskDir = null;
+ private static String uniqueId = null;
private static byte[] buffer;
private static long maxExceptionsPerFile = 200;
@@ -72,14 +69,12 @@ public class NNBenchWithoutMR extends Co
* This allows multiple instances of this program, running on clock
* synchronized nodes, to start at roughly the same time.
*/
-
static void barrier() {
long sleepTime;
while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
- //This left empty on purpose
}
}
}
@@ -103,20 +98,18 @@ public class NNBenchWithoutMR extends Co
static int createWrite() {
int totalExceptions = 0;
FSDataOutputStream out = null;
- boolean success;
+ boolean success = false;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
do { // create file until is succeeds or max exceptions reached
try {
out = fileSys.create(
- new Path(taskDir, "" + index), false, 512,
- (short)1, bytesPerBlock);
+ new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock);
success = true;
} catch (IOException ioe) {
success=false;
totalExceptions++;
- handleException("creating file #" + index, ioe,
- ++singleFileExceptions);
+ handleException("creating file #" + index, ioe, ++singleFileExceptions);
}
} while (!success);
long toBeWritten = bytesPerFile;
@@ -127,8 +120,7 @@ public class NNBenchWithoutMR extends Co
out.write(buffer, 0, nbytes);
} catch (IOException ioe) {
totalExceptions++;
- handleException("writing to file #" + index, ioe,
- ++singleFileExceptions);
+ handleException("writing to file #" + index, ioe, ++singleFileExceptions);
}
}
do { // close file until is succeeds
@@ -138,8 +130,7 @@ public class NNBenchWithoutMR extends Co
} catch (IOException ioe) {
success=false;
totalExceptions++;
- handleException("closing file #" + index, ioe,
- ++singleFileExceptions);
+ handleException("closing file #" + index, ioe, ++singleFileExceptions);
}
} while (!success);
}
@@ -153,7 +144,7 @@ public class NNBenchWithoutMR extends Co
*/
static int openRead() {
int totalExceptions = 0;
- FSDataInputStream in;
+ FSDataInputStream in = null;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
try {
@@ -162,12 +153,11 @@ public class NNBenchWithoutMR extends Co
while (toBeRead > 0) {
int nbytes = (int) Math.min(buffer.length, toBeRead);
toBeRead -= nbytes;
- try { // only try once && we don't care about a number of bytes read
+ try { // only try once
in.read(buffer, 0, nbytes);
} catch (IOException ioe) {
totalExceptions++;
- handleException("reading from file #" + index, ioe,
- ++singleFileExceptions);
+ handleException("reading from file #" + index, ioe, ++singleFileExceptions);
}
}
in.close();
@@ -187,23 +177,19 @@ public class NNBenchWithoutMR extends Co
*/
static int rename() {
int totalExceptions = 0;
- boolean success;
+ boolean success = false;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
do { // rename file until is succeeds
try {
- // Possible result of this operation is at no interest to us for it
- // can return false only if the namesystem
- // could rename the path from the name
- // space (e.g. no Exception has been thrown)
- fileSys.rename(new Path(taskDir, "" + index),
- new Path(taskDir, "A" + index));
+ boolean result = fileSys.rename(
+ new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
success = true;
- } catch (IOException ioe) {
- success = false;
+ } catch (IOException ioe) {
+ success=false;
totalExceptions++;
handleException("creating file #" + index, ioe, ++singleFileExceptions);
- }
+ }
} while (!success);
}
return totalExceptions;
@@ -217,18 +203,14 @@ public class NNBenchWithoutMR extends Co
*/
static int delete() {
int totalExceptions = 0;
- boolean success;
+ boolean success = false;
for (int index = 0; index < numFiles; index++) {
int singleFileExceptions = 0;
do { // delete file until is succeeds
try {
- // Possible result of this operation is at no interest to us for it
- // can return false only if namesystem
- // delete could remove the path from the name
- // space (e.g. no Exception has been thrown)
- fileSys.delete(new Path(taskDir, "A" + index), true);
+ boolean result = fileSys.delete(new Path(taskDir, "A" + index), true);
success = true;
- } catch (IOException ioe) {
+ } catch (IOException ioe) {
success=false;
totalExceptions++;
handleException("creating file #" + index, ioe, ++singleFileExceptions);
@@ -257,23 +239,15 @@ public class NNBenchWithoutMR extends Co
* [-bytesPerChecksum <value for io.bytes.per.checksum>]
* </pre>
*
- * @param args is an array of the program command line arguments
* @throws IOException indicates a problem with test startup
*/
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new NNBenchWithoutMR(), args);
- System.exit(res);
- }
-
- @Override
- public int run(String[] args) throws Exception {
-
+ public static void main(String[] args) throws IOException {
String version = "NameNodeBenchmark.0.3";
System.out.println(version);
int bytesPerChecksum = -1;
String usage =
- "Usage: NNBenchWithoutMR " +
+ "Usage: nnbench " +
" -operation <one of createWrite, openRead, rename, or delete> " +
" -baseDir <base output/input DFS path> " +
" -startTime <time to start, given in seconds from the epoch> " +
@@ -301,13 +275,13 @@ public class NNBenchWithoutMR extends Co
operation = args[++i];
} else {
System.out.println(usage);
- return -1;
+ System.exit(-1);
}
}
bytesPerFile = bytesPerBlock * blocksPerFile;
JobConf jobConf = new JobConf(new Configuration(), NNBench.class);
-
+
if ( bytesPerChecksum < 0 ) { // if it is not set in cmdline
bytesPerChecksum = jobConf.getInt("io.bytes.per.checksum", 512);
}
@@ -330,11 +304,11 @@ public class NNBenchWithoutMR extends Co
bytesPerBlock % bytesPerChecksum != 0)
{
System.err.println(usage);
- return -1;
+ System.exit(-1);
}
fileSys = FileSystem.get(jobConf);
- String uniqueId = java.net.InetAddress.getLocalHost().getHostName();
+ uniqueId = java.net.InetAddress.getLocalHost().getHostName();
taskDir = new Path(baseDir, uniqueId);
// initialize buffer used for writing/reading file
buffer = new byte[(int) Math.min(bytesPerFile, 32768L)];
@@ -359,14 +333,12 @@ public class NNBenchWithoutMR extends Co
exceptions = delete();
} else {
System.err.println(usage);
- return -1;
+ System.exit(-1);
}
endTime = new Date();
System.out.println("Job ended: " + endTime);
duration = (endTime.getTime() - execTime.getTime()) /1000;
System.out.println("The " + operation + " job took " + duration + " seconds.");
System.out.println("The job recorded " + exceptions + " exceptions.");
-
- return 0;
}
}