You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/04 06:39:18 UTC
svn commit: r399543 - in /lucene/hadoop/trunk: ./
src/test/org/apache/hadoop/fs/ src/test/org/apache/hadoop/test/
Author: cutting
Date: Wed May 3 21:39:16 2006
New Revision: 399543
URL: http://svn.apache.org/viewcvs?rev=399543&view=rev
Log:
HADOOP-193 & HADOOP-194. A filesystem benchmark and a filesystem checker. Contributed by Konstantin.
Added:
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399543&r1=399542&r2=399543&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed May 3 21:39:16 2006
@@ -177,6 +177,13 @@
WritableComparable, but is not yet otherwise used by
Hadoop. (Milind Bhandarkar via cutting)
+46. HADOOP-193. Add a MapReduce-based FileSystem benchmark.
+ (Konstantin Shvachko via cutting)
+
+47. HADOOP-194. Add a MapReduce-based FileSystem checker. This reads
+ every block in every file in the filesystem. (Konstantin Shvachko
+ via cutting)
+
Release 0.1.1 - 2006-04-08
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Wed May 3 21:39:16 2006
@@ -0,0 +1,81 @@
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * Reducer that accumulates values based on their type.
+ * <p>
+ * The type is specified in the key part of the key-value pair
+ * as a prefix to the key in the following way
+ * <p>
+ * <tt>type:key</tt>
+ * <p>
+ * The values are accumulated according to the types:
+ * <ul>
+ * <li><tt>s:</tt> - string, concatenate</li>
+ * <li><tt>f:</tt> - float, summ</li>
+ * <li><tt>l:</tt> - long, summ</li>
+ * </ul>
+ *
+ * @author Konstantin Shvachko
+ */
+public class AccumulatingReducer extends MapReduceBase implements Reducer {
+ protected String hostName;
+
+ public AccumulatingReducer () {
+ TaskTracker.LOG.info("Starting AccumulatingReducer !!!");
+ try {
+ hostName = java.net.InetAddress.getLocalHost().getHostName();
+ } catch(Exception e) {
+ hostName = "localhost";
+ }
+ TaskTracker.LOG.info("Starting AccumulatingReducer on " + hostName);
+ }
+
+ public void reduce( WritableComparable key,
+ Iterator values,
+ OutputCollector output,
+ Reporter reporter
+ ) throws IOException {
+ String field = ((UTF8) key).toString();
+
+ reporter.setStatus("starting " + field + " ::host = " + hostName);
+
+ // concatenate strings
+ if (field.startsWith("s:")) {
+ String sSum = "";
+ while (values.hasNext())
+ sSum += ((UTF8) values.next()).toString() + ";";
+ output.collect(key, new UTF8(sSum));
+ reporter.setStatus("finished " + field + " ::host = " + hostName);
+ return;
+ }
+ // sum long values
+ if (field.startsWith("f:")) {
+ float fSum = 0;
+ while (values.hasNext())
+ fSum += Float.parseFloat(((UTF8) values.next()).toString());
+ output.collect(key, new UTF8(String.valueOf(fSum)));
+ reporter.setStatus("finished " + field + " ::host = " + hostName);
+ return;
+ }
+ // sum long values
+ if (field.startsWith("l:")) {
+ long lSum = 0;
+ while (values.hasNext()) {
+ lSum += Long.parseLong(((UTF8) values.next()).toString());
+ }
+ output.collect(key, new UTF8(String.valueOf(lSum)));
+ }
+ reporter.setStatus("finished " + field + " ::host = " + hostName);
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Wed May 3 21:39:16 2006
@@ -0,0 +1,324 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+
+import junit.framework.TestCase;
+import java.util.logging.*;
+import java.util.Date;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
+import java.util.Vector;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+/**
+ * Distributed checkup of the file system consistency.
+ * <p>
+ * Test file system consistency by reading each block of each file
+ * of the specified file tree.
+ * Report corrupted blocks and general file statistics.
+ * <p>
+ * Optionally displays statistics on read performance.
+ *
+ * @author Konstantin Shvachko
+ */
+public class DistributedFSCheck extends TestCase {
+ // Constants
+ private static final int TEST_TYPE_READ = 0;
+ private static final int TEST_TYPE_CLEANUP = 2;
+ private static final int DEFAULT_BUFFER_SIZE = 1000000;
+ private static final String DEFAULT_RES_FILE_NAME = "DistributedFSCheck_results.log";
+ private static final long MEGA = 0x100000;
+
+ private static Configuration fsConfig = new Configuration();
+ private static final Logger LOG = InputFormatBase.LOG;
+ private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
+ private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
+ private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
+
+ private FileSystem fs;
+ private long nrFiles;
+
+ DistributedFSCheck(Configuration conf) throws Exception {
+ fsConfig = conf;
+ this.fs = FileSystem.get( conf );
+ }
+
+ /**
+ * Run distributed checkup for the entire files system.
+ *
+ * @throws Exception
+ */
+ public void testFSBlocks() throws Exception {
+ testFSBlocks("/");
+ }
+
+ /**
+ * Run distributed checkup for the specified directory.
+ *
+ * @param rootName root directory name
+ * @throws Exception
+ */
+ public void testFSBlocks( String rootName ) throws Exception {
+ createInputFile(rootName);
+ runDistributedFSCheck();
+ }
+
+ private void createInputFile( String rootName ) throws IOException {
+ fs.delete(MAP_INPUT_DIR);
+
+ Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
+ SequenceFile.Writer writer =
+ new SequenceFile.Writer(fs, inputFile, UTF8.class, LongWritable.class);
+
+ try {
+ nrFiles = 0;
+ listSubtree( new Path( rootName ), writer );
+ } finally {
+ writer.close();
+ }
+ LOG.info( "Created map input files." );
+ }
+
+ private void listSubtree( Path rootFile,
+ SequenceFile.Writer writer
+ ) throws IOException {
+ if( ! fs.isDirectory(rootFile) ) {
+ nrFiles++;
+ // For a regular file generate <fName,offset> pairs
+ long blockSize = fs.getBlockSize();
+ long fileLength = fs.getLength( rootFile );
+ for( long offset = 0; offset < fileLength; offset += blockSize )
+ writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
+ return;
+ }
+
+ Path children[] = fs.listPaths( rootFile );
+ if( children == null )
+ throw new IOException("Could not get listing for " + rootFile);
+ for (int i = 0; i < children.length; i++)
+ listSubtree( children[i], writer );
+ }
+
+ /**
+ * DistributedFSCheck mapper class.
+ */
+ public static class DistributedFSCheckMapper extends IOMapperBase {
+
+ public DistributedFSCheckMapper() {
+ super(fsConfig);
+ }
+
+ public Object doIO( Reporter reporter,
+ String name,
+ long offset
+ ) throws IOException {
+ // open file
+ DataInputStream in;
+ in = new DataInputStream(fs.open(new Path(name)));
+ long actualSize = 0;
+ try {
+ long blockSize = fs.getBlockSize();
+ int curSize = bufferSize;
+ for( actualSize = 0;
+ curSize == bufferSize && actualSize < blockSize;
+ actualSize += curSize) {
+ curSize = in.read( buffer, (int)offset, Math.min(bufferSize, (int)(blockSize - actualSize)) );
+ reporter.setStatus( "reading " + name + "@" +
+ offset + "/" + blockSize );
+ }
+ } catch( IOException e ) {
+ LOG.info( "Corrupted block detected in \"" + name + "\" at " + offset );
+ return name + "@" + offset;
+ } finally {
+ in.close();
+ }
+ return new Long( actualSize );
+ }
+
+ void collectStats(OutputCollector output,
+ String name,
+ long execTime,
+ Object corruptedBlock ) throws IOException {
+ output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
+
+ if( corruptedBlock.getClass().getName().endsWith("String") ) {
+ output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
+ return;
+ }
+ long totalSize = ((Long)corruptedBlock).longValue();
+ float ioRateMbSec = (float)totalSize * 1000 / (execTime * 0x100000);
+ LOG.info( "Number of bytes processed = " + totalSize );
+ LOG.info( "Exec time = " + execTime );
+ LOG.info( "IO rate = " + ioRateMbSec );
+
+ output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
+ output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
+ output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+ }
+ }
+
+ private void runDistributedFSCheck() throws Exception {
+ fs.delete(READ_DIR);
+ JobConf job = new JobConf( fs.getConf(), DistributedFSCheck.class );
+
+ job.setInputPath(MAP_INPUT_DIR);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(LongWritable.class);
+
+ job.setMapperClass(DistributedFSCheckMapper.class);
+ job.setReducerClass(AccumulatingReducer.class);
+
+ job.setOutputPath(READ_DIR);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(UTF8.class);
+ job.setNumReduceTasks(1);
+ JobClient.runJob(job);
+ }
+
+ public static void main(String[] args) throws Exception {
+ int testType = TEST_TYPE_READ;
+ int bufferSize = DEFAULT_BUFFER_SIZE;
+ String resFileName = DEFAULT_RES_FILE_NAME;
+ String rootName = "/";
+ boolean viewStats = false;
+
+ String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
+
+ if(args.length == 1 && args[0].startsWith("-h")) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+ for(int i = 0; i < args.length; i++) { // parse command line
+ if(args[i].equals("-root")) {
+ rootName = args[++i];
+ } else if (args[i].startsWith("-clean")) {
+ testType = TEST_TYPE_CLEANUP;
+ } else if(args[i].equals("-bufferSize")) {
+ bufferSize = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-resFile")) {
+ resFileName = args[++i];
+ } else if(args[i].startsWith("-stat")) {
+ viewStats = true;
+ }
+ }
+
+ LOG.info("root = " + rootName);
+ LOG.info("bufferSize = " + bufferSize);
+
+ Configuration conf = new Configuration();
+ conf.setInt("test.io.file.buffer.size", bufferSize);
+ DistributedFSCheck test = new DistributedFSCheck( conf );
+
+ if( testType == TEST_TYPE_CLEANUP ) {
+ test.cleanup();
+ return;
+ }
+ test.createInputFile( rootName );
+ long tStart = System.currentTimeMillis();
+ test.runDistributedFSCheck();
+ long execTime = System.currentTimeMillis() - tStart;
+
+ test.analyzeResult( execTime, resFileName, viewStats );
+ }
+
+ private void analyzeResult( long execTime,
+ String resFileName,
+ boolean viewStats
+ ) throws IOException {
+ Path reduceFile= new Path( READ_DIR, "part-00000" );
+ DataInputStream in;
+ in = new DataInputStream(fs.open( reduceFile ));
+
+ BufferedReader lines;
+ lines = new BufferedReader(new InputStreamReader(in));
+ long blocks = 0;
+ long size = 0;
+ long time = 0;
+ float rate = 0;
+ StringTokenizer badBlocks = null;
+ long nrBadBlocks = 0;
+ String line;
+ while( (line = lines.readLine()) != null ) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+ String attr = tokens.nextToken();
+ if( attr.endsWith("blocks") )
+ blocks = Long.parseLong( tokens.nextToken() );
+ else if( attr.endsWith("size") )
+ size = Long.parseLong( tokens.nextToken() );
+ else if( attr.endsWith("time") )
+ time = Long.parseLong( tokens.nextToken() );
+ else if( attr.endsWith("rate") )
+ rate = Float.parseFloat( tokens.nextToken() );
+ else if( attr.endsWith("badBlocks") ) {
+ badBlocks = new StringTokenizer(tokens.nextToken(), ";");
+ nrBadBlocks = badBlocks.countTokens();
+ }
+ }
+
+ Vector resultLines = new Vector();
+ resultLines.add( "----- DistributedFSCheck ----- : " );
+ resultLines.add( " Date & time: " + new Date(System.currentTimeMillis()) );
+ resultLines.add( " Total number of blocks: " + blocks );
+ resultLines.add( " Total number of files: " + nrFiles );
+ resultLines.add( "Number of corrupted blocks: " + nrBadBlocks );
+
+ int nrBadFilesPos = resultLines.size();
+ TreeSet badFiles = new TreeSet();
+ long nrBadFiles = 0;
+ if( nrBadBlocks > 0 ) {
+ resultLines.add("" );
+ resultLines.add("----- Corrupted Blocks (file@offset) ----- : ");
+ while( badBlocks.hasMoreTokens() ) {
+ String curBlock = badBlocks.nextToken();
+ resultLines.add( curBlock );
+ badFiles.add( curBlock.substring(0, curBlock.indexOf('@')) );
+ }
+ nrBadFiles = badFiles.size();
+ }
+
+ resultLines.insertElementAt( " Number of corrupted files: " + nrBadFiles, nrBadFilesPos );
+
+ if( viewStats ) {
+ resultLines.add("" );
+ resultLines.add("----- Performance ----- : " );
+ resultLines.add(" Total MBytes read: " + size/MEGA );
+ resultLines.add(" Throughput mb/sec: " + (float)size * 1000.0 / (time * MEGA) );
+ resultLines.add(" Average IO rate mb/sec: " + rate / 1000 / blocks );
+ resultLines.add(" Test exec time sec: " + (float)execTime / 1000 );
+ }
+
+ PrintStream res = new PrintStream(
+ new FileOutputStream(
+ new File(resFileName), true ));
+ for( int i = 0; i < resultLines.size(); i++ ) {
+ String cur = (String)resultLines.get(i);
+ LOG.info( cur );
+ res.println( cur );
+ }
+ }
+
+ private void cleanup() throws Exception {
+ LOG.info( "Cleaning up test files" );
+ fs.delete(TEST_ROOT_DIR);
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/IOMapperBase.java Wed May 3 21:39:16 2006
@@ -0,0 +1,113 @@
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Base mapper class for IO operations.
+ * <p>
+ * Two abstract method {@link #doIO(Reporter,String,long)} and
+ * {@link #collectStats(OutputCollector,String,long,Object)} should be
+ * overloaded in derived classes to define the IO operation and the
+ * statistics data to be collected by subsequent reducers.
+ *
+ * @author Konstantin Shvachko
+ */
+public abstract class IOMapperBase extends Configured implements Mapper {
+ protected byte[] buffer;
+ protected int bufferSize;
+ protected FileSystem fs;
+ protected String hostName;
+
+ public IOMapperBase(Configuration conf) {
+ super(conf);
+ try {
+ fs = FileSystem.get(conf);
+ } catch (Exception e) {
+ throw new RuntimeException( "Cannot create file system.", e );
+ }
+ bufferSize = conf.getInt("test.io.file.buffer.size", 4096);
+ buffer = new byte[bufferSize];
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch(Exception e) {
+ hostName = "localhost";
+ }
+ }
+
+ public void configure(JobConf job) {
+ setConf(job);
+ }
+
+ public void close() throws IOException {
+ }
+
+ /**
+ * Perform io operation, usually read or write.
+ *
+ * @param reporter
+ * @param name file name
+ * @param value offset within the file
+ * @return object that is passed as a parameter to
+ * {@link #collectStats(OutputCollector,String,long,Object)}
+ * @throws IOException
+ */
+ abstract Object doIO( Reporter reporter,
+ String name,
+ long value ) throws IOException;
+
+ /**
+ * Collect stat data to be combined by a subsequent reducer.
+ *
+ * @param output
+ * @param name file name
+ * @param execTime IO execution time
+ * @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
+ * @throws IOException
+ */
+ abstract void collectStats( OutputCollector output,
+ String name,
+ long execTime,
+ Object doIOReturnValue ) throws IOException;
+
+ /**
+ * Map file name and offset into statistical data.
+ * <p>
+ * The map task is to get the
+ * <tt>key</tt>, which contains the file name, and the
+ * <tt>value</tt>, which is the offset within the file.
+ *
+ * The parameters are passed to the abstract method
+ * {@link #doIO(Reporter,String,long)}, which performs the io operation,
+ * usually read or write data, and then
+ * {@link #collectStats(OutputCollector,String,long,Object)}
+ * is called to prepare stat data for a subsequent reducer.
+ */
+ public void map(WritableComparable key,
+ Writable value,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ String name = ((UTF8)key).toString();
+ long longValue = ((LongWritable)value).get();
+
+ reporter.setStatus("starting " + name + " ::host = " + hostName);
+
+ long tStart = System.currentTimeMillis();
+ Object statValue = doIO( reporter, name, longValue );
+ long tEnd = System.currentTimeMillis();
+ long execTime = tEnd - tStart;
+ collectStats( output, name, execTime, statValue );
+
+ reporter.setStatus("finished " + name + " ::host = " + hostName);
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=399543&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Wed May 3 21:39:16 2006
@@ -0,0 +1,430 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+
+import junit.framework.TestCase;
+import java.util.logging.*;
+import java.util.Date;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+/**
+ * Distributed i/o benchmark.
+ * <p>
+ * This test writes into or reads from a specified number of files.
+ * File size is specified as a parameter to the test.
+ * Each file is accessed in a separate map task.
+ * <p>
+ * The reducer collects the following statistics:
+ * <ul>
+ * <li>number of tasks completed</li>
+ * <li>number of bytes written/read</li>
+ * <li>execution time</li>
+ * <li>io rate</li>
+ * <li>io rate squared</li>
+ * </ul>
+ *
+ * Finally, the following information is appended to a local file
+ * <ul>
+ * <li>read or write test</li>
+ * <li>date and time the test finished</li>
+ * <li>number of files</li>
+ * <li>total number of bytes processed</li>
+ * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
+ * <li>average i/o rate in mb/sec per file</li>
+ * <li>standard i/o rate deviation</li>
+ * </ul>
+ *
+ * @author Konstantin Shvachko
+ */
+public class TestDFSIO extends TestCase {
+ // Constants
+ private static final int TEST_TYPE_READ = 0;
+ private static final int TEST_TYPE_WRITE = 1;
+ private static final int TEST_TYPE_CLEANUP = 2;
+ private static final int DEFAULT_BUFFER_SIZE = 1000000;
+ private static final String BASE_FILE_NAME = "test_io_";
+ private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
+
+ private static final Logger LOG = InputFormatBase.LOG;
+ private static Configuration fsConfig = new Configuration();
+ private static final long MEGA = 0x100000;
+ private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
+ private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
+ private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
+ private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
+ private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
+
+ /**
+ * Run the test with default parameters.
+ *
+ * @throws Exception
+ */
+ public void testIOs() throws Exception {
+ testIOs(10, 10);
+ }
+
+ /**
+ * Run the test with the specified parameters.
+ *
+ * @param fileSize file size
+ * @param nrFiles number of files
+ * @throws IOException
+ */
+ public static void testIOs(int fileSize, int nrFiles)
+ throws IOException {
+
+ FileSystem fs = FileSystem.get(fsConfig);
+
+ createControlFile(fs, fileSize, nrFiles);
+ writeTest(fs);
+ readTest(fs);
+ }
+
+ private static void createControlFile(
+ FileSystem fs,
+ int fileSize, // in MB
+ int nrFiles
+ ) throws IOException {
+ LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
+
+ fs.delete(CONTROL_DIR);
+
+ for( int i=0; i < nrFiles; i++ ) {
+ String name = getFileName(i);
+ Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
+ SequenceFile.Writer writer = null;
+ try {
+ writer = new SequenceFile.Writer(fs, controlFile,
+ UTF8.class, LongWritable.class);
+ writer.append(new UTF8(name), new LongWritable(fileSize));
+ } catch(Exception e) {
+ throw new IOException(e.getLocalizedMessage());
+ } finally {
+ if( writer != null )
+ writer.close();
+ writer = null;
+ }
+ }
+ LOG.info("created control files for: "+nrFiles+" files");
+ }
+
+ private static String getFileName( int fIdx ) {
+ return BASE_FILE_NAME + Integer.toString(fIdx);
+ }
+
+ /**
+ * Write/Read mapper base class.
+ * <p>
+ * Collects the following statistics per task:
+ * <ul>
+ * <li>number of tasks completed</li>
+ * <li>number of bytes written/read</li>
+ * <li>execution time</li>
+ * <li>i/o rate</li>
+ * <li>i/o rate squared</li>
+ * </ul>
+ */
+ private abstract static class IOStatMapper extends IOMapperBase {
+ IOStatMapper() {
+ super(fsConfig);
+ }
+
+ void collectStats(OutputCollector output,
+ String name,
+ long execTime,
+ Object objSize ) throws IOException {
+ long totalSize = ((Long)objSize).longValue();
+ float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
+ LOG.info("Number of bytes processed = " + totalSize );
+ LOG.info("Exec time = " + execTime );
+ LOG.info("IO rate = " + ioRateMbSec );
+
+ output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
+ output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
+ output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
+ output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ }
+ }
+
+ /**
+ * Write mapper class.
+ */
+ public static class WriteMapper extends IOStatMapper {
+
+ public WriteMapper() {
+ super();
+ for( int i=0; i < bufferSize; i++ )
+ buffer[i] = (byte)('0' + i % 50);
+ }
+
+ public Object doIO( Reporter reporter,
+ String name,
+ long totalSize
+ ) throws IOException {
+ // create file
+ totalSize *= MEGA;
+ OutputStream out;
+ out = fs.create(new Path(DATA_DIR, name), true, bufferSize);
+
+ try {
+ // write to the file
+ long nrRemaining;
+ for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize ) {
+ int curSize = ( bufferSize < nrRemaining ) ? bufferSize : (int)nrRemaining;
+ out.write( buffer, 0, curSize );
+ reporter.setStatus( "writing " + name + "@" +
+ (totalSize - nrRemaining) + "/" + totalSize
+ + " ::host = " + hostName);
+ }
+ } finally {
+ out.close();
+ }
+ return new Long(totalSize);
+ }
+ }
+
+ private static void writeTest(FileSystem fs)
+ throws IOException {
+
+ fs.delete(DATA_DIR);
+ fs.delete(WRITE_DIR);
+
+ runIOTest( WriteMapper.class, WRITE_DIR );
+ }
+
+ private static void runIOTest( Class mapperClass,
+ Path outputDir
+ ) throws IOException {
+ JobConf job = new JobConf( fsConfig, TestDFSIO.class );
+
+ job.setInputPath(CONTROL_DIR);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(LongWritable.class);
+
+ job.setMapperClass(mapperClass);
+ job.setReducerClass(AccumulatingReducer.class);
+
+ job.setOutputPath(outputDir);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(UTF8.class);
+ job.setNumReduceTasks(1);
+ JobClient.runJob(job);
+ }
+
+ /**
+ * Read mapper class.
+ */
+ public static class ReadMapper extends IOStatMapper {
+
+ public ReadMapper() {
+ super();
+ }
+
+ public Object doIO( Reporter reporter,
+ String name,
+ long totalSize
+ ) throws IOException {
+ totalSize *= MEGA;
+ // open file
+ DataInputStream in;
+ in = new DataInputStream(fs.open(new Path(DATA_DIR, name)));
+ try {
+ long actualSize = 0;
+ for( int curSize = bufferSize; curSize == bufferSize; ) {
+ curSize = in.read( buffer, 0, bufferSize );
+ actualSize += curSize;
+ reporter.setStatus( "reading " + name + "@" +
+ actualSize + "/" + totalSize
+ + " ::host = " + hostName);
+ }
+ } finally {
+ in.close();
+ }
+ return new Long(totalSize);
+ }
+ }
+
+ private static void readTest(FileSystem fs) throws IOException {
+ fs.delete(READ_DIR);
+ runIOTest( ReadMapper.class, READ_DIR );
+ }
+
+ private static void sequentialTest(
+ FileSystem fs,
+ int testType,
+ int fileSize,
+ int nrFiles
+ ) throws Exception {
+ IOStatMapper ioer = null;
+ if( testType == TEST_TYPE_READ )
+ ioer = new ReadMapper();
+ else if( testType == TEST_TYPE_WRITE )
+ ioer = new WriteMapper();
+ else
+ return;
+ for( int i=0; i < nrFiles; i++)
+ ioer.doIO(new Reporter() {
+ public void setStatus(String status) throws IOException {}
+ },
+ BASE_FILE_NAME+Integer.toString(i),
+ MEGA*fileSize );
+ }
+
+ public static void main(String[] args) {
+ int testType = TEST_TYPE_READ;
+ int bufferSize = DEFAULT_BUFFER_SIZE;
+ int fileSize = 1;
+ int nrFiles = 1;
+ String resFileName = DEFAULT_RES_FILE_NAME;
+ boolean isSequential = false;
+
+ String version="TestFDSIO.0.0.4";
+ String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
+
+ System.out.println(version);
+ if (args.length == 0) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+ for (int i = 0; i < args.length; i++) { // parse command line
+ if (args[i].startsWith("-r")) {
+ testType = TEST_TYPE_READ;
+ } else if (args[i].startsWith("-w")) {
+ testType = TEST_TYPE_WRITE;
+ } else if (args[i].startsWith("-clean")) {
+ testType = TEST_TYPE_CLEANUP;
+ } else if (args[i].startsWith("-seq")) {
+ isSequential = true;
+ } else if (args[i].equals("-nrFiles")) {
+ nrFiles = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-fileSize")) {
+ fileSize = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-bufferSize")) {
+ bufferSize = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-resFile")) {
+ resFileName = args[++i];
+ }
+ }
+
+ LOG.info("nrFiles = " + nrFiles);
+ LOG.info("fileSize (MB) = " + fileSize);
+ LOG.info("bufferSize = " + bufferSize);
+
+ try {
+ fsConfig.setInt("test.io.file.buffer.size", bufferSize);
+ FileSystem fs = FileSystem.get(fsConfig);
+
+ if( isSequential ) {
+ long tStart = System.currentTimeMillis();
+ sequentialTest( fs, testType, fileSize, nrFiles );
+ long execTime = System.currentTimeMillis() - tStart;
+ String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
+ LOG.info( resultLine );
+ return;
+ }
+ if( testType == TEST_TYPE_CLEANUP ) {
+ cleanup( fs );
+ return;
+ }
+ createControlFile(fs, fileSize, nrFiles);
+ long tStart = System.currentTimeMillis();
+ if( testType == TEST_TYPE_WRITE )
+ writeTest(fs);
+ if( testType == TEST_TYPE_READ )
+ readTest(fs);
+ long execTime = System.currentTimeMillis() - tStart;
+
+ analyzeResult( fs, testType, execTime, resFileName );
+ } catch( Exception e ) {
+ System.err.print( e.getLocalizedMessage());
+ System.exit(-1);
+ }
+ }
+
+ private static void analyzeResult( FileSystem fs,
+ int testType,
+ long execTime,
+ String resFileName
+ ) throws IOException {
+ Path reduceFile;
+ if( testType == TEST_TYPE_WRITE )
+ reduceFile = new Path( WRITE_DIR, "part-00000" );
+ else
+ reduceFile = new Path( READ_DIR, "part-00000" );
+ DataInputStream in;
+ in = new DataInputStream(fs.open( reduceFile ));
+
+ BufferedReader lines;
+ lines = new BufferedReader(new InputStreamReader(in));
+ long tasks = 0;
+ long size = 0;
+ long time = 0;
+ float rate = 0;
+ float sqrate = 0;
+ String line;
+ while( (line = lines.readLine()) != null ) {
+ StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+ String attr = tokens.nextToken();
+ if( attr.endsWith(":tasks") )
+ tasks = Long.parseLong( tokens.nextToken() );
+ else if( attr.endsWith(":size") )
+ size = Long.parseLong( tokens.nextToken() );
+ else if( attr.endsWith(":time") )
+ time = Long.parseLong( tokens.nextToken() );
+ else if( attr.endsWith(":rate") )
+ rate = Float.parseFloat( tokens.nextToken() );
+ else if( attr.endsWith(":sqrate") )
+ sqrate = Float.parseFloat( tokens.nextToken() );
+ }
+
+ double med = rate / 1000 / tasks;
+ double stdDev = Math.sqrt( Math.abs(sqrate / 1000 / tasks - med*med ));
+ String resultLines[] = {
+ "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
+ (testType == TEST_TYPE_READ) ? "read" :
+ "unknown"),
+ " Date & time: " + new Date(System.currentTimeMillis()),
+ " Number of files: " + tasks,
+ "Total MBytes processed: " + size/MEGA,
+ " Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
+ "Average IO rate mb/sec: " + med,
+ " Std IO rate deviation: " + stdDev,
+ " Test exec time sec: " + (float)execTime / 1000,
+ "" };
+
+ PrintStream res = new PrintStream(
+ new FileOutputStream(
+ new File(resFileName), true ));
+ for( int i = 0; i < resultLines.length; i++ ) {
+ LOG.info( resultLines[i] );
+ res.println( resultLines[i] );
+ }
+ }
+
+ private static void cleanup( FileSystem fs ) throws Exception {
+ LOG.info( "Cleaning up test files" );
+ fs.delete(new Path(TEST_ROOT_DIR));
+ }
+}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=399543&r1=399542&r2=399543&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Wed May 3 21:39:16 2006
@@ -17,12 +17,6 @@
package org.apache.hadoop.test;
import org.apache.hadoop.util.ProgramDriver;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
import org.apache.hadoop.mapred.TestMapRed;
import org.apache.hadoop.mapred.TestTextInputFormat;
import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
@@ -33,6 +27,7 @@
import org.apache.hadoop.io.TestSequenceFile;
import org.apache.hadoop.ipc.TestIPC;
import org.apache.hadoop.ipc.TestRPC;
+import org.apache.hadoop.fs.TestDFSIO;
public class AllTestDriver {
@@ -54,6 +49,7 @@
pgd.addClass("testipc", TestIPC.class, "A test for ipc.");
pgd.addClass("testsequencefileinputformat", TestSequenceFileInputFormat.class, "A test for sequence file input format.");
pgd.addClass("testtextinputformat", TestTextInputFormat.class, "A test for text input format.");
+ pgd.addClass("TestDFSIO", TestDFSIO.class, "Distributed i/o benchmark.");
pgd.driver(argv);
}
catch(Throwable e){