You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2010/06/04 04:08:31 UTC
svn commit: r951238 - in /hadoop/common/branches/branch-0.20: ./
src/test/org/apache/hadoop/fs/
Author: shv
Date: Fri Jun 4 02:08:30 2010
New Revision: 951238
URL: http://svn.apache.org/viewvc?rev=951238&view=rev
Log:
MAPREDUCE-1832. Merge -r 951232:951233 from trunk to branch-0.20.
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Fri Jun 4 02:08:30 2010
@@ -39,6 +39,8 @@ Release 0.20.3 - Unreleased
MAPREDUCE-1407. Update javadoc in mapreduce.{Mapper,Reducer} to match
actual usage. (Benoit Sigoure via cdouglas)
+ MAPREDUCE-1832. Allow file sizes less than 1MB in DFSIO benchmark. (shv)
+
Release 0.20.2 - 2010-2-19
NEW FEATURES
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Fri Jun 4 02:08:30 2010
@@ -22,12 +22,8 @@ import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
/**
* Reducer that accumulates values based on their type.
@@ -45,14 +41,17 @@ import org.apache.hadoop.mapred.Reporter
* </ul>
*
*/
+@SuppressWarnings("deprecation")
public class AccumulatingReducer extends MapReduceBase
- implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+ implements Reducer<Text, Text, Text, Text> {
+ static final String VALUE_TYPE_LONG = "l:";
+ static final String VALUE_TYPE_FLOAT = "f:";
+ static final String VALUE_TYPE_STRING = "s:";
private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class);
protected String hostName;
public AccumulatingReducer () {
- LOG.info("Starting AccumulatingReducer !!!");
try {
hostName = java.net.InetAddress.getLocalHost().getHostName();
} catch(Exception e) {
@@ -61,9 +60,9 @@ public class AccumulatingReducer extends
LOG.info("Starting AccumulatingReducer on " + hostName);
}
- public void reduce(UTF8 key,
- Iterator<UTF8> values,
- OutputCollector<UTF8, UTF8> output,
+ public void reduce(Text key,
+ Iterator<Text> values,
+ OutputCollector<Text, Text> output,
Reporter reporter
) throws IOException {
String field = key.toString();
@@ -71,30 +70,30 @@ public class AccumulatingReducer extends
reporter.setStatus("starting " + field + " ::host = " + hostName);
// concatenate strings
- if (field.startsWith("s:")) {
- String sSum = "";
+ if (field.startsWith(VALUE_TYPE_STRING)) {
+ StringBuffer sSum = new StringBuffer();
while (values.hasNext())
- sSum += values.next().toString() + ";";
- output.collect(key, new UTF8(sSum));
+ sSum.append(values.next().toString()).append(";");
+ output.collect(key, new Text(sSum.toString()));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith("f:")) {
+ if (field.startsWith(VALUE_TYPE_FLOAT)) {
float fSum = 0;
while (values.hasNext())
fSum += Float.parseFloat(values.next().toString());
- output.collect(key, new UTF8(String.valueOf(fSum)));
+ output.collect(key, new Text(String.valueOf(fSum)));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith("l:")) {
+ if (field.startsWith(VALUE_TYPE_LONG)) {
long lSum = 0;
while (values.hasNext()) {
lSum += Long.parseLong(values.next().toString());
}
- output.collect(key, new UTF8(String.valueOf(lSum)));
+ output.collect(key, new Text(String.valueOf(lSum)));
}
reporter.setStatus("finished " + field + " ::host = " + hostName);
}
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java Fri Jun 4 02:08:30 2010
@@ -18,18 +18,26 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
/**
* Distributed i/o benchmark.
@@ -60,6 +68,7 @@ import org.apache.hadoop.conf.*;
*/
public class DFSCIOTest extends TestCase {
// Constants
+ private static final Log LOG = LogFactory.getLog(DFSCIOTest.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
@@ -67,7 +76,6 @@ public class DFSCIOTest extends TestCase
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log";
- private static final Log LOG = FileInputFormat.LOG;
private static Configuration fsConfig = new Configuration();
private static final long MEGA = 0x100000;
private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest");
@@ -124,9 +132,9 @@ public class DFSCIOTest extends TestCase
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- UTF8.class, LongWritable.class,
+ Text.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new UTF8(name), new LongWritable(fileSize));
+ writer.append(new Text(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -154,26 +162,30 @@ public class DFSCIOTest extends TestCase
* <li>i/o rate squared</li>
* </ul>
*/
- private abstract static class IOStatMapper extends IOMapperBase {
+ private abstract static class IOStatMapper extends IOMapperBase<Long> {
IOStatMapper() {
- super(fsConfig);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
- Object objSize) throws IOException {
- long totalSize = ((Long)objSize).longValue();
+ Long objSize) throws IOException {
+ long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
- output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+ new Text(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+ new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
@@ -188,7 +200,7 @@ public class DFSCIOTest extends TestCase
buffer[i] = (byte)('0' + i % 50);
}
- public Object doIO(Reporter reporter,
+ public Long doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
@@ -274,8 +286,8 @@ public class DFSCIOTest extends TestCase
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@@ -289,7 +301,7 @@ public class DFSCIOTest extends TestCase
super();
}
- public Object doIO(Reporter reporter,
+ public Long doIO(Reporter reporter,
String name,
long totalSize
) throws IOException {
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Fri Jun 4 02:08:30 2010
@@ -18,20 +18,28 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.Vector;
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
/**
* Distributed checkup of the file system consistency.
@@ -45,6 +53,7 @@ import org.apache.hadoop.conf.*;
*/
public class DistributedFSCheck extends TestCase {
// Constants
+ private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_CLEANUP = 2;
private static final int DEFAULT_BUFFER_SIZE = 1000000;
@@ -52,7 +61,6 @@ public class DistributedFSCheck extends
private static final long MEGA = 0x100000;
private static Configuration fsConfig = new Configuration();
- private static final Log LOG = FileInputFormat.LOG;
private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
@@ -92,7 +100,7 @@ public class DistributedFSCheck extends
Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, fsConfig, inputFile,
- UTF8.class, LongWritable.class, CompressionType.NONE);
+ Text.class, LongWritable.class, CompressionType.NONE);
try {
nrFiles = 0;
@@ -112,7 +120,7 @@ public class DistributedFSCheck extends
long blockSize = fs.getDefaultBlockSize();
long fileLength = fs.getLength(rootFile);
for(long offset = 0; offset < fileLength; offset += blockSize)
- writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
+ writer.append(new Text(rootFile.toString()), new LongWritable(offset));
return;
}
@@ -126,10 +134,9 @@ public class DistributedFSCheck extends
/**
* DistributedFSCheck mapper class.
*/
- public static class DistributedFSCheckMapper extends IOMapperBase {
+ public static class DistributedFSCheckMapper extends IOMapperBase<Object> {
public DistributedFSCheckMapper() {
- super(fsConfig);
}
public Object doIO(Reporter reporter,
@@ -163,14 +170,17 @@ public class DistributedFSCheck extends
return new Long(actualSize);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Object corruptedBlock) throws IOException {
- output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"),
+ new Text(String.valueOf(1)));
if (corruptedBlock.getClass().getName().endsWith("String")) {
- output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
+ output.collect(
+ new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"),
+ new Text((String)corruptedBlock));
return;
}
long totalSize = ((Long)corruptedBlock).longValue();
@@ -179,9 +189,12 @@ public class DistributedFSCheck extends
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
}
}
@@ -195,8 +208,8 @@ public class DistributedFSCheck extends
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java Fri Jun 4 02:08:30 2010
@@ -19,16 +19,10 @@ package org.apache.hadoop.fs;
import java.io.IOException;
import java.net.InetAddress;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
/**
* Base mapper class for IO operations.
@@ -39,16 +33,20 @@ import org.apache.hadoop.mapred.Reporter
* statistics data to be collected by subsequent reducers.
*
*/
-public abstract class IOMapperBase extends Configured
- implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
+@SuppressWarnings("deprecation")
+public abstract class IOMapperBase<T> extends Configured
+ implements Mapper<Text, LongWritable, Text, Text> {
protected byte[] buffer;
protected int bufferSize;
protected FileSystem fs;
protected String hostName;
- public IOMapperBase(Configuration conf) {
- super(conf);
+ public IOMapperBase() {
+ }
+
+ public void configure(JobConf conf) {
+ setConf(conf);
try {
fs = FileSystem.get(conf);
} catch (Exception e) {
@@ -63,10 +61,6 @@ public abstract class IOMapperBase exten
}
}
- public void configure(JobConf job) {
- setConf(job);
- }
-
public void close() throws IOException {
}
@@ -80,7 +74,7 @@ public abstract class IOMapperBase exten
* {@link #collectStats(OutputCollector,String,long,Object)}
* @throws IOException
*/
- abstract Object doIO(Reporter reporter,
+ abstract T doIO(Reporter reporter,
String name,
long value) throws IOException;
@@ -93,10 +87,10 @@ public abstract class IOMapperBase exten
* @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
* @throws IOException
*/
- abstract void collectStats(OutputCollector<UTF8, UTF8> output,
+ abstract void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
- Object doIOReturnValue) throws IOException;
+ T doIOReturnValue) throws IOException;
/**
* Map file name and offset into statistical data.
@@ -111,9 +105,9 @@ public abstract class IOMapperBase exten
* {@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
- public void map(UTF8 key,
+ public void map(Text key,
LongWritable value,
- OutputCollector<UTF8, UTF8> output,
+ OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
@@ -121,7 +115,7 @@ public abstract class IOMapperBase exten
reporter.setStatus("starting " + name + " ::host = " + hostName);
long tStart = System.currentTimeMillis();
- Object statValue = doIO(reporter, name, longValue);
+ T statValue = doIO(reporter, name, longValue);
long tEnd = System.currentTimeMillis();
long execTime = tEnd - tStart;
collectStats(output, name, execTime, statValue);
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java Fri Jun 4 02:08:30 2010
@@ -18,19 +18,31 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
/**
* Distributed i/o benchmark.
@@ -59,23 +71,91 @@ import org.apache.hadoop.conf.*;
* <li>standard deviation of i/o rate </li>
* </ul>
*/
-public class TestDFSIO extends TestCase {
+public class TestDFSIO extends TestCase implements Tool {
// Constants
+ private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
+ private static final int TEST_TYPE_APPEND = 3;
private static final int DEFAULT_BUFFER_SIZE = 1000000;
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
-
- private static final Log LOG = FileInputFormat.LOG;
- private static Configuration fsConfig = new Configuration();
- private static final long MEGA = 0x100000;
- private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
- private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
- private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
- private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
- private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
+ private static final long MEGA = ByteMultiple.MB.value();
+ private static final String USAGE =
+ "Usage: " + TestDFSIO.class.getSimpleName() +
+ " [genericOptions]" +
+ " -read | -write | -append | -clean [-nrFiles N]" +
+ " [-fileSize Size[B|KB|MB|GB|TB]]" +
+ " [-resFile resultFileName] [-bufferSize Bytes]" +
+ " [-rootDir]";
+
+ private Configuration config;
+
+ static{
+ Configuration.addDefaultResource("hdfs-default.xml");
+ Configuration.addDefaultResource("hdfs-site.xml");
+ Configuration.addDefaultResource("mapred-default.xml");
+ Configuration.addDefaultResource("mapred-site.xml");
+ }
+
+ static enum ByteMultiple {
+ B(1L),
+ KB(0x400L),
+ MB(0x100000L),
+ GB(0x40000000L),
+ TB(0x10000000000L);
+
+ private long multiplier;
+
+ private ByteMultiple(long mult) {
+ multiplier = mult;
+ }
+
+ long value() {
+ return multiplier;
+ }
+
+ static ByteMultiple parseString(String sMultiple) {
+ if(sMultiple == null || sMultiple.isEmpty()) // MB by default
+ return MB;
+ String sMU = sMultiple.toUpperCase();
+ if(B.name().toUpperCase().endsWith(sMU))
+ return B;
+ if(KB.name().toUpperCase().endsWith(sMU))
+ return KB;
+ if(MB.name().toUpperCase().endsWith(sMU))
+ return MB;
+ if(GB.name().toUpperCase().endsWith(sMU))
+ return GB;
+ if(TB.name().toUpperCase().endsWith(sMU))
+ return TB;
+ throw new IllegalArgumentException("Unsupported ByteMultiple "+sMultiple);
+ }
+ }
+
+ public TestDFSIO() {
+ this.config = new Configuration();
+ }
+
+ private static String getBaseDir(Configuration conf) {
+ return conf.get("test.build.data","/benchmarks/TestDFSIO");
+ }
+ private static Path getControlDir(Configuration conf) {
+ return new Path(getBaseDir(conf), "io_control");
+ }
+ private static Path getWriteDir(Configuration conf) {
+ return new Path(getBaseDir(conf), "io_write");
+ }
+ private static Path getReadDir(Configuration conf) {
+ return new Path(getBaseDir(conf), "io_read");
+ }
+ private static Path getAppendDir(Configuration conf) {
+ return new Path(getBaseDir(conf), "io_append");
+ }
+ private static Path getDataDir(Configuration conf) {
+ return new Path(getBaseDir(conf), "io_data");
+ }
/**
* Run the test with default parameters.
@@ -83,7 +163,8 @@ public class TestDFSIO extends TestCase
* @throws Exception
*/
public void testIOs() throws Exception {
- testIOs(10, 10);
+ TestDFSIO bench = new TestDFSIO();
+ bench.testIOs(1, 4);
}
/**
@@ -93,35 +174,54 @@ public class TestDFSIO extends TestCase
* @param nrFiles number of files
* @throws IOException
*/
- public static void testIOs(int fileSize, int nrFiles)
+ public void testIOs(int fileSize, int nrFiles)
throws IOException {
+ config.setBoolean("dfs.support.append", true);
+ MiniDFSCluster cluster = null;
+ try {
+ cluster = new MiniDFSCluster(config, 2, true, null);
+ FileSystem fs = cluster.getFileSystem();
- FileSystem fs = FileSystem.get(fsConfig);
+ createControlFile(fs, fileSize, nrFiles);
+ long tStart = System.currentTimeMillis();
+ writeTest(fs);
+ long execTime = System.currentTimeMillis() - tStart;
+ analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
- createControlFile(fs, fileSize, nrFiles);
- writeTest(fs);
- readTest(fs);
- cleanup(fs);
+ tStart = System.currentTimeMillis();
+ readTest(fs);
+ execTime = System.currentTimeMillis() - tStart;
+ analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
+
+ tStart = System.currentTimeMillis();
+ appendTest(fs);
+ execTime = System.currentTimeMillis() - tStart;
+ analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
+
+ cleanup(fs);
+ } finally {
+ if(cluster != null) cluster.shutdown();
+ }
}
- private static void createControlFile(
- FileSystem fs,
- int fileSize, // in MB
- int nrFiles
- ) throws IOException {
- LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
+ private void createControlFile(FileSystem fs,
+ long fileSize, // in bytes
+ int nrFiles
+ ) throws IOException {
+ LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files");
- fs.delete(CONTROL_DIR, true);
+ Path controlDir = getControlDir(config);
+ fs.delete(controlDir, true);
for(int i=0; i < nrFiles; i++) {
String name = getFileName(i);
- Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
+ Path controlFile = new Path(controlDir, "in_file_" + name);
SequenceFile.Writer writer = null;
try {
- writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- UTF8.class, LongWritable.class,
+ writer = SequenceFile.createWriter(fs, config, controlFile,
+ Text.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new UTF8(name), new LongWritable(fileSize));
+ writer.append(new Text(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -149,48 +249,51 @@ public class TestDFSIO extends TestCase
* <li>i/o rate squared</li>
* </ul>
*/
- private abstract static class IOStatMapper extends IOMapperBase {
+ private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
IOStatMapper() {
- super(fsConfig);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
- Object objSize) throws IOException {
- long totalSize = ((Long)objSize).longValue();
+ Long objSize) throws IOException {
+ long totalSize = objSize.longValue();
float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
LOG.info("Number of bytes processed = " + totalSize);
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
- output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+ new Text(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+ new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
/**
* Write mapper class.
*/
- public static class WriteMapper extends IOStatMapper {
+ public static class WriteMapper extends IOStatMapper<Long> {
public WriteMapper() {
- super();
for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50);
}
- public Object doIO(Reporter reporter,
+ @Override
+ public Long doIO(Reporter reporter,
String name,
- long totalSize
- ) throws IOException {
+ long totalSize // in bytes
+ ) throws IOException {
// create file
- totalSize *= MEGA;
OutputStream out;
- out = fs.create(new Path(DATA_DIR, name), true, bufferSize);
+ out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
try {
// write to the file
@@ -205,57 +308,98 @@ public class TestDFSIO extends TestCase
} finally {
out.close();
}
- return new Long(totalSize);
+ return Long.valueOf(totalSize);
}
}
- private static void writeTest(FileSystem fs)
- throws IOException {
-
- fs.delete(DATA_DIR, true);
- fs.delete(WRITE_DIR, true);
+ private void writeTest(FileSystem fs) throws IOException {
+ Path writeDir = getWriteDir(config);
+ fs.delete(getDataDir(config), true);
+ fs.delete(writeDir, true);
- runIOTest(WriteMapper.class, WRITE_DIR);
+ runIOTest(WriteMapper.class, writeDir);
}
- private static void runIOTest( Class<? extends Mapper> mapperClass,
- Path outputDir
- ) throws IOException {
- JobConf job = new JobConf(fsConfig, TestDFSIO.class);
+ @SuppressWarnings("deprecation")
+ private void runIOTest(
+ Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass,
+ Path outputDir) throws IOException {
+ JobConf job = new JobConf(config, TestDFSIO.class);
- FileInputFormat.setInputPaths(job, CONTROL_DIR);
+ FileInputFormat.setInputPaths(job, getControlDir(config));
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(mapperClass);
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
/**
+ * Append mapper class.
+ */
+ public static class AppendMapper extends IOStatMapper<Long> {
+
+ public AppendMapper() {
+ for(int i=0; i < bufferSize; i++)
+ buffer[i] = (byte)('0' + i % 50);
+ }
+
+ public Long doIO(Reporter reporter,
+ String name,
+ long totalSize // in bytes
+ ) throws IOException {
+ // create file
+ OutputStream out;
+ out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
+
+ try {
+ // write to the file
+ long nrRemaining;
+ for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
+ int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining;
+ out.write(buffer, 0, curSize);
+ reporter.setStatus("writing " + name + "@" +
+ (totalSize - nrRemaining) + "/" + totalSize
+ + " ::host = " + hostName);
+ }
+ } finally {
+ out.close();
+ }
+ return Long.valueOf(totalSize);
+ }
+ }
+
+ private void appendTest(FileSystem fs) throws IOException {
+ Path appendDir = getAppendDir(config);
+ fs.delete(appendDir, true);
+ runIOTest(AppendMapper.class, appendDir);
+ }
+
+ /**
* Read mapper class.
*/
- public static class ReadMapper extends IOStatMapper {
+ public static class ReadMapper extends IOStatMapper<Long> {
public ReadMapper() {
- super();
}
- public Object doIO(Reporter reporter,
+ public Long doIO(Reporter reporter,
String name,
- long totalSize
- ) throws IOException {
- totalSize *= MEGA;
+ long totalSize // in bytes
+ ) throws IOException {
// open file
- DataInputStream in = fs.open(new Path(DATA_DIR, name));
+ DataInputStream in = fs.open(new Path(getDataDir(getConf()), name));
+ long actualSize = 0;
try {
- long actualSize = 0;
- for(int curSize = bufferSize; curSize == bufferSize;) {
+ for(int curSize = bufferSize;
+ curSize == bufferSize && actualSize < totalSize;) {
curSize = in.read(buffer, 0, bufferSize);
+ if(curSize < 0) break;
actualSize += curSize;
reporter.setStatus("reading " + name + "@" +
actualSize + "/" + totalSize
@@ -264,55 +408,73 @@ public class TestDFSIO extends TestCase
} finally {
in.close();
}
- return new Long(totalSize);
+ return Long.valueOf(actualSize);
}
}
- private static void readTest(FileSystem fs) throws IOException {
- fs.delete(READ_DIR, true);
- runIOTest(ReadMapper.class, READ_DIR);
+ private void readTest(FileSystem fs) throws IOException {
+ Path readDir = getReadDir(config);
+ fs.delete(readDir, true);
+ runIOTest(ReadMapper.class, readDir);
}
- private static void sequentialTest(
- FileSystem fs,
- int testType,
- int fileSize,
- int nrFiles
- ) throws Exception {
- IOStatMapper ioer = null;
+ private void sequentialTest(FileSystem fs,
+ int testType,
+ long fileSize, // in bytes
+ int nrFiles
+ ) throws IOException {
+ IOStatMapper<Long> ioer = null;
if (testType == TEST_TYPE_READ)
ioer = new ReadMapper();
else if (testType == TEST_TYPE_WRITE)
ioer = new WriteMapper();
+ else if (testType == TEST_TYPE_APPEND)
+ ioer = new AppendMapper();
else
return;
for(int i=0; i < nrFiles; i++)
ioer.doIO(Reporter.NULL,
BASE_FILE_NAME+Integer.toString(i),
- MEGA*fileSize);
+ fileSize);
}
public static void main(String[] args) {
+ TestDFSIO bench = new TestDFSIO();
+ int res = -1;
+ try {
+ res = ToolRunner.run(bench, args);
+ } catch(Exception e) {
+ System.err.print(StringUtils.stringifyException(e));
+ res = -2;
+ }
+ if(res == -1)
+ System.err.print(USAGE);
+ System.exit(res);
+ }
+
+ @Override // Tool
+ public int run(String[] args) throws IOException {
int testType = TEST_TYPE_READ;
int bufferSize = DEFAULT_BUFFER_SIZE;
- int fileSize = 1;
+ long fileSize = 1*MEGA;
int nrFiles = 1;
String resFileName = DEFAULT_RES_FILE_NAME;
boolean isSequential = false;
+ String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
- String version="TestFDSIO.0.0.4";
- String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
-
- System.out.println(version);
+ LOG.info(version);
if (args.length == 0) {
- System.err.println(usage);
- System.exit(-1);
+ System.err.println("Missing arguments.");
+ return -1;
}
+
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].startsWith("-read")) {
testType = TEST_TYPE_READ;
} else if (args[i].equals("-write")) {
testType = TEST_TYPE_WRITE;
+ } else if (args[i].equals("-append")) {
+ testType = TEST_TYPE_APPEND;
} else if (args[i].equals("-clean")) {
testType = TEST_TYPE_CLEANUP;
} else if (args[i].startsWith("-seq")) {
@@ -320,83 +482,120 @@ public class TestDFSIO extends TestCase
} else if (args[i].equals("-nrFiles")) {
nrFiles = Integer.parseInt(args[++i]);
} else if (args[i].equals("-fileSize")) {
- fileSize = Integer.parseInt(args[++i]);
+ fileSize = parseSize(args[++i]);
} else if (args[i].equals("-bufferSize")) {
bufferSize = Integer.parseInt(args[++i]);
} else if (args[i].equals("-resFile")) {
resFileName = args[++i];
+ } else {
+ System.err.println("Illegal argument: " + args[i]);
+ return -1;
}
}
LOG.info("nrFiles = " + nrFiles);
- LOG.info("fileSize (MB) = " + fileSize);
+ LOG.info("fileSize (MB) = " + toMB(fileSize));
LOG.info("bufferSize = " + bufferSize);
-
- try {
- fsConfig.setInt("test.io.file.buffer.size", bufferSize);
- FileSystem fs = FileSystem.get(fsConfig);
+ LOG.info("baseDir = " + getBaseDir(config));
- if (isSequential) {
- long tStart = System.currentTimeMillis();
- sequentialTest(fs, testType, fileSize, nrFiles);
- long execTime = System.currentTimeMillis() - tStart;
- String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
- LOG.info(resultLine);
- return;
- }
- if (testType == TEST_TYPE_CLEANUP) {
- cleanup(fs);
- return;
- }
- createControlFile(fs, fileSize, nrFiles);
+ config.setInt("test.io.file.buffer.size", bufferSize);
+ config.setBoolean("dfs.support.append", true);
+ FileSystem fs = FileSystem.get(config);
+
+ if (isSequential) {
long tStart = System.currentTimeMillis();
- if (testType == TEST_TYPE_WRITE)
- writeTest(fs);
- if (testType == TEST_TYPE_READ)
- readTest(fs);
+ sequentialTest(fs, testType, fileSize, nrFiles);
long execTime = System.currentTimeMillis() - tStart;
-
- analyzeResult(fs, testType, execTime, resFileName);
- } catch(Exception e) {
- System.err.print(StringUtils.stringifyException(e));
- System.exit(-1);
+ String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
+ LOG.info(resultLine);
+ return 0;
+ }
+ if (testType == TEST_TYPE_CLEANUP) {
+ cleanup(fs);
+ return 0;
}
- }
+ createControlFile(fs, fileSize, nrFiles);
+ long tStart = System.currentTimeMillis();
+ if (testType == TEST_TYPE_WRITE)
+ writeTest(fs);
+ if (testType == TEST_TYPE_READ)
+ readTest(fs);
+ if (testType == TEST_TYPE_APPEND)
+ appendTest(fs);
+ long execTime = System.currentTimeMillis() - tStart;
- private static void analyzeResult( FileSystem fs,
- int testType,
- long execTime,
- String resFileName
- ) throws IOException {
+ analyzeResult(fs, testType, execTime, resFileName);
+ return 0;
+ }
+
+ @Override // Configurable
+ public Configuration getConf() {
+ return this.config;
+ }
+
+ @Override // Configurable
+ public void setConf(Configuration conf) {
+ this.config = conf;
+ }
+
+ /**
+ * Returns size in bytes.
+ *
+ * @param arg = {d}[B|KB|MB|GB|TB]
+ * @return
+ */
+ static long parseSize(String arg) {
+ String[] args = arg.split("\\D", 2); // get digits
+ assert args.length <= 2;
+ long fileSize = Long.parseLong(args[0]);
+ String bytesMult = arg.substring(args[0].length()); // get byte multiple
+ return fileSize * ByteMultiple.parseString(bytesMult).value();
+ }
+
+ static float toMB(long bytes) {
+ return ((float)bytes)/MEGA;
+ }
+
+ private void analyzeResult( FileSystem fs,
+ int testType,
+ long execTime,
+ String resFileName
+ ) throws IOException {
Path reduceFile;
if (testType == TEST_TYPE_WRITE)
- reduceFile = new Path(WRITE_DIR, "part-00000");
- else
- reduceFile = new Path(READ_DIR, "part-00000");
- DataInputStream in;
- in = new DataInputStream(fs.open(reduceFile));
-
- BufferedReader lines;
- lines = new BufferedReader(new InputStreamReader(in));
+ reduceFile = new Path(getWriteDir(config), "part-00000");
+ else if (testType == TEST_TYPE_APPEND)
+ reduceFile = new Path(getAppendDir(config), "part-00000");
+ else // if (testType == TEST_TYPE_READ)
+ reduceFile = new Path(getReadDir(config), "part-00000");
long tasks = 0;
long size = 0;
long time = 0;
float rate = 0;
float sqrate = 0;
- String line;
- while((line = lines.readLine()) != null) {
- StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
- String attr = tokens.nextToken();
- if (attr.endsWith(":tasks"))
- tasks = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":size"))
- size = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":time"))
- time = Long.parseLong(tokens.nextToken());
- else if (attr.endsWith(":rate"))
- rate = Float.parseFloat(tokens.nextToken());
- else if (attr.endsWith(":sqrate"))
- sqrate = Float.parseFloat(tokens.nextToken());
+ DataInputStream in = null;
+ BufferedReader lines = null;
+ try {
+ in = new DataInputStream(fs.open(reduceFile));
+ lines = new BufferedReader(new InputStreamReader(in));
+ String line;
+ while((line = lines.readLine()) != null) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+ String attr = tokens.nextToken();
+ if (attr.endsWith(":tasks"))
+ tasks = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":size"))
+ size = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":time"))
+ time = Long.parseLong(tokens.nextToken());
+ else if (attr.endsWith(":rate"))
+ rate = Float.parseFloat(tokens.nextToken());
+ else if (attr.endsWith(":sqrate"))
+ sqrate = Float.parseFloat(tokens.nextToken());
+ }
+ } finally {
+ if(in != null) in.close();
+ if(lines != null) lines.close();
}
double med = rate / 1000 / tasks;
@@ -404,27 +603,32 @@ public class TestDFSIO extends TestCase
String resultLines[] = {
"----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
(testType == TEST_TYPE_READ) ? "read" :
+ (testType == TEST_TYPE_APPEND) ? "append" :
"unknown"),
" Date & time: " + new Date(System.currentTimeMillis()),
" Number of files: " + tasks,
- "Total MBytes processed: " + size/MEGA,
+ "Total MBytes processed: " + toMB(size),
" Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
"Average IO rate mb/sec: " + med,
" IO rate std deviation: " + stdDev,
" Test exec time sec: " + (float)execTime / 1000,
"" };
- PrintStream res = new PrintStream(
- new FileOutputStream(
- new File(resFileName), true));
- for(int i = 0; i < resultLines.length; i++) {
- LOG.info(resultLines[i]);
- res.println(resultLines[i]);
+ PrintStream res = null;
+ try {
+ res = new PrintStream(new FileOutputStream(new File(resFileName), true));
+ for(int i = 0; i < resultLines.length; i++) {
+ LOG.info(resultLines[i]);
+ res.println(resultLines[i]);
+ }
+ } finally {
+ if(res != null) res.close();
}
}
- private static void cleanup(FileSystem fs) throws IOException {
+ private void cleanup(FileSystem fs)
+ throws IOException {
LOG.info("Cleaning up test files");
- fs.delete(new Path(TEST_ROOT_DIR), true);
+ fs.delete(new Path(getBaseDir(config)), true);
}
}