You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2010/06/04 04:08:31 UTC

svn commit: r951238 - in /hadoop/common/branches/branch-0.20: ./ src/test/org/apache/hadoop/fs/

Author: shv
Date: Fri Jun  4 02:08:30 2010
New Revision: 951238

URL: http://svn.apache.org/viewvc?rev=951238&view=rev
Log:
MAPREDUCE-1832. Merge -r 951232:951233 from trunk to branch-0.20.

Modified:
    hadoop/common/branches/branch-0.20/CHANGES.txt
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java
    hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java

Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Fri Jun  4 02:08:30 2010
@@ -39,6 +39,8 @@ Release 0.20.3 - Unreleased
     MAPREDUCE-1407. Update javadoc in mapreduce.{Mapper,Reducer} to match
     actual usage. (Benoit Sigoure via cdouglas)
 
+    MAPREDUCE-1832. Allow file sizes less than 1MB in DFSIO benchmark. (shv)
+
 Release 0.20.2 - 2010-2-19
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/AccumulatingReducer.java Fri Jun  4 02:08:30 2010
@@ -22,12 +22,8 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
 
 /**
  * Reducer that accumulates values based on their type.
@@ -45,14 +41,17 @@ import org.apache.hadoop.mapred.Reporter
  * </ul>
  * 
  */
+@SuppressWarnings("deprecation")
 public class AccumulatingReducer extends MapReduceBase
-    implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+    implements Reducer<Text, Text, Text, Text> {
+  static final String VALUE_TYPE_LONG = "l:";
+  static final String VALUE_TYPE_FLOAT = "f:";
+  static final String VALUE_TYPE_STRING = "s:";
   private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class);
   
   protected String hostName;
   
   public AccumulatingReducer () {
-    LOG.info("Starting AccumulatingReducer !!!");
     try {
       hostName = java.net.InetAddress.getLocalHost().getHostName();
     } catch(Exception e) {
@@ -61,9 +60,9 @@ public class AccumulatingReducer extends
     LOG.info("Starting AccumulatingReducer on " + hostName);
   }
   
-  public void reduce(UTF8 key, 
-                     Iterator<UTF8> values,
-                     OutputCollector<UTF8, UTF8> output, 
+  public void reduce(Text key, 
+                     Iterator<Text> values,
+                     OutputCollector<Text, Text> output, 
                      Reporter reporter
                      ) throws IOException {
     String field = key.toString();
@@ -71,30 +70,30 @@ public class AccumulatingReducer extends
     reporter.setStatus("starting " + field + " ::host = " + hostName);
 
     // concatenate strings
-    if (field.startsWith("s:")) {
-      String sSum = "";
+    if (field.startsWith(VALUE_TYPE_STRING)) {
+      StringBuffer sSum = new StringBuffer();
       while (values.hasNext())
-        sSum += values.next().toString() + ";";
-      output.collect(key, new UTF8(sSum));
+        sSum.append(values.next().toString()).append(";");
+      output.collect(key, new Text(sSum.toString()));
       reporter.setStatus("finished " + field + " ::host = " + hostName);
       return;
     }
     // sum long values
-    if (field.startsWith("f:")) {
+    if (field.startsWith(VALUE_TYPE_FLOAT)) {
       float fSum = 0;
       while (values.hasNext())
         fSum += Float.parseFloat(values.next().toString());
-      output.collect(key, new UTF8(String.valueOf(fSum)));
+      output.collect(key, new Text(String.valueOf(fSum)));
       reporter.setStatus("finished " + field + " ::host = " + hostName);
       return;
     }
     // sum long values
-    if (field.startsWith("l:")) {
+    if (field.startsWith(VALUE_TYPE_LONG)) {
       long lSum = 0;
       while (values.hasNext()) {
         lSum += Long.parseLong(values.next().toString());
       }
-      output.collect(key, new UTF8(String.valueOf(lSum)));
+      output.collect(key, new Text(String.valueOf(lSum)));
     }
     reporter.setStatus("finished " + field + " ::host = " + hostName);
   }

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DFSCIOTest.java Fri Jun  4 02:08:30 2010
@@ -18,18 +18,26 @@
 
 package org.apache.hadoop.fs;
 
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
 import java.util.Date;
 import java.util.StringTokenizer;
 
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
 
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
 
 /**
  * Distributed i/o benchmark.
@@ -60,6 +68,7 @@ import org.apache.hadoop.conf.*;
  */
 public class DFSCIOTest extends TestCase {
   // Constants
+  private static final Log LOG = LogFactory.getLog(DFSCIOTest.class);
   private static final int TEST_TYPE_READ = 0;
   private static final int TEST_TYPE_WRITE = 1;
   private static final int TEST_TYPE_CLEANUP = 2;
@@ -67,7 +76,6 @@ public class DFSCIOTest extends TestCase
   private static final String BASE_FILE_NAME = "test_io_";
   private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log";
   
-  private static final Log LOG = FileInputFormat.LOG;
   private static Configuration fsConfig = new Configuration();
   private static final long MEGA = 0x100000;
   private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest");
@@ -124,9 +132,9 @@ public class DFSCIOTest extends TestCase
       SequenceFile.Writer writer = null;
       try {
         writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
-                                           UTF8.class, LongWritable.class,
+                                           Text.class, LongWritable.class,
                                            CompressionType.NONE);
-        writer.append(new UTF8(name), new LongWritable(fileSize));
+        writer.append(new Text(name), new LongWritable(fileSize));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());
       } finally {
@@ -154,26 +162,30 @@ public class DFSCIOTest extends TestCase
    * <li>i/o rate squared</li>
    * </ul>
    */
-  private abstract static class IOStatMapper extends IOMapperBase {
+  private abstract static class IOStatMapper extends IOMapperBase<Long> {
     IOStatMapper() { 
-      super(fsConfig);
     }
     
-    void collectStats(OutputCollector<UTF8, UTF8> output, 
+    void collectStats(OutputCollector<Text, Text> output, 
                       String name,
                       long execTime, 
-                      Object objSize) throws IOException {
-      long totalSize = ((Long)objSize).longValue();
+                      Long objSize) throws IOException {
+      long totalSize = objSize.longValue();
       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
       LOG.info("Number of bytes processed = " + totalSize);
       LOG.info("Exec time = " + execTime);
       LOG.info("IO rate = " + ioRateMbSec);
       
-      output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
-      output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
-      output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
-      output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
-      output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+          new Text(String.valueOf(1)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+          new Text(String.valueOf(totalSize)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+          new Text(String.valueOf(execTime)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+          new Text(String.valueOf(ioRateMbSec*1000)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+          new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
     }
   }
 
@@ -188,7 +200,7 @@ public class DFSCIOTest extends TestCase
         buffer[i] = (byte)('0' + i % 50);
     }
 
-    public Object doIO(Reporter reporter, 
+    public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize 
                        ) throws IOException {
@@ -274,8 +286,8 @@ public class DFSCIOTest extends TestCase
     job.setReducerClass(AccumulatingReducer.class);
 
     FileOutputFormat.setOutputPath(job, outputDir);
-    job.setOutputKeyClass(UTF8.class);
-    job.setOutputValueClass(UTF8.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
     job.setNumReduceTasks(1);
     JobClient.runJob(job);
   }
@@ -289,7 +301,7 @@ public class DFSCIOTest extends TestCase
       super(); 
     }
 
-    public Object doIO(Reporter reporter, 
+    public Long doIO(Reporter reporter, 
                        String name, 
                        long totalSize 
                        ) throws IOException {

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/DistributedFSCheck.java Fri Jun  4 02:08:30 2010
@@ -18,20 +18,28 @@
 
 package org.apache.hadoop.fs;
 
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
 import java.util.Date;
 import java.util.StringTokenizer;
 import java.util.TreeSet;
 import java.util.Vector;
 
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
 
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
 
 /**
  * Distributed checkup of the file system consistency.
@@ -45,6 +53,7 @@ import org.apache.hadoop.conf.*;
  */
 public class DistributedFSCheck extends TestCase {
   // Constants
+  private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class);
   private static final int TEST_TYPE_READ = 0;
   private static final int TEST_TYPE_CLEANUP = 2;
   private static final int DEFAULT_BUFFER_SIZE = 1000000;
@@ -52,7 +61,6 @@ public class DistributedFSCheck extends 
   private static final long MEGA = 0x100000;
   
   private static Configuration fsConfig = new Configuration();
-  private static final Log LOG = FileInputFormat.LOG;
   private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
   private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
   private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
@@ -92,7 +100,7 @@ public class DistributedFSCheck extends 
     Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
     SequenceFile.Writer writer =
       SequenceFile.createWriter(fs, fsConfig, inputFile, 
-                                UTF8.class, LongWritable.class, CompressionType.NONE);
+                                Text.class, LongWritable.class, CompressionType.NONE);
     
     try {
       nrFiles = 0;
@@ -112,7 +120,7 @@ public class DistributedFSCheck extends 
       long blockSize = fs.getDefaultBlockSize();
       long fileLength = fs.getLength(rootFile);
       for(long offset = 0; offset < fileLength; offset += blockSize)
-        writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
+        writer.append(new Text(rootFile.toString()), new LongWritable(offset));
       return;
     }
     
@@ -126,10 +134,9 @@ public class DistributedFSCheck extends 
   /**
    * DistributedFSCheck mapper class.
    */
-  public static class DistributedFSCheckMapper extends IOMapperBase {
+  public static class DistributedFSCheckMapper extends IOMapperBase<Object> {
 
     public DistributedFSCheckMapper() { 
-      super(fsConfig); 
     }
 
     public Object doIO(Reporter reporter, 
@@ -163,14 +170,17 @@ public class DistributedFSCheck extends 
       return new Long(actualSize);
     }
     
-    void collectStats(OutputCollector<UTF8, UTF8> output, 
+    void collectStats(OutputCollector<Text, Text> output, 
                       String name, 
                       long execTime, 
                       Object corruptedBlock) throws IOException {
-      output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"),
+          new Text(String.valueOf(1)));
 
       if (corruptedBlock.getClass().getName().endsWith("String")) {
-        output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
+        output.collect(
+            new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"),
+            new Text((String)corruptedBlock));
         return;
       }
       long totalSize = ((Long)corruptedBlock).longValue();
@@ -179,9 +189,12 @@ public class DistributedFSCheck extends 
       LOG.info("Exec time = " + execTime);
       LOG.info("IO rate = " + ioRateMbSec);
       
-      output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
-      output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
-      output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+          new Text(String.valueOf(totalSize)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+          new Text(String.valueOf(execTime)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+          new Text(String.valueOf(ioRateMbSec*1000)));
     }
   }
   
@@ -195,8 +208,8 @@ public class DistributedFSCheck extends 
     job.setReducerClass(AccumulatingReducer.class);
 
     FileOutputFormat.setOutputPath(job, READ_DIR);
-    job.setOutputKeyClass(UTF8.class);
-    job.setOutputValueClass(UTF8.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
     job.setNumReduceTasks(1);
     JobClient.runJob(job);
   }

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/IOMapperBase.java Fri Jun  4 02:08:30 2010
@@ -19,16 +19,10 @@ package org.apache.hadoop.fs;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
 
 /**
  * Base mapper class for IO operations.
@@ -39,16 +33,20 @@ import org.apache.hadoop.mapred.Reporter
  * statistics data to be collected by subsequent reducers.
  * 
  */
-public abstract class IOMapperBase extends Configured
-    implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
+@SuppressWarnings("deprecation")
+public abstract class IOMapperBase<T> extends Configured
+    implements Mapper<Text, LongWritable, Text, Text> {
   
   protected byte[] buffer;
   protected int bufferSize;
   protected FileSystem fs;
   protected String hostName;
 
-  public IOMapperBase(Configuration conf) { 
-    super(conf); 
+  public IOMapperBase() { 
+  }
+
+  public void configure(JobConf conf) {
+    setConf(conf);
     try {
       fs = FileSystem.get(conf);
     } catch (Exception e) {
@@ -63,10 +61,6 @@ public abstract class IOMapperBase exten
     }
   }
 
-  public void configure(JobConf job) {
-    setConf(job);
-  }
-
   public void close() throws IOException {
   }
   
@@ -80,7 +74,7 @@ public abstract class IOMapperBase exten
    *          {@link #collectStats(OutputCollector,String,long,Object)}
    * @throws IOException
    */
-  abstract Object doIO(Reporter reporter, 
+  abstract T doIO(Reporter reporter, 
                        String name, 
                        long value) throws IOException;
 
@@ -93,10 +87,10 @@ public abstract class IOMapperBase exten
    * @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
    * @throws IOException
    */
-  abstract void collectStats(OutputCollector<UTF8, UTF8> output, 
+  abstract void collectStats(OutputCollector<Text, Text> output, 
                              String name, 
                              long execTime, 
-                             Object doIOReturnValue) throws IOException;
+                             T doIOReturnValue) throws IOException;
   
   /**
    * Map file name and offset into statistical data.
@@ -111,9 +105,9 @@ public abstract class IOMapperBase exten
    * {@link #collectStats(OutputCollector,String,long,Object)} 
    * is called to prepare stat data for a subsequent reducer.
    */
-  public void map(UTF8 key, 
+  public void map(Text key, 
                   LongWritable value,
-                  OutputCollector<UTF8, UTF8> output, 
+                  OutputCollector<Text, Text> output, 
                   Reporter reporter) throws IOException {
     String name = key.toString();
     long longValue = value.get();
@@ -121,7 +115,7 @@ public abstract class IOMapperBase exten
     reporter.setStatus("starting " + name + " ::host = " + hostName);
     
     long tStart = System.currentTimeMillis();
-    Object statValue = doIO(reporter, name, longValue);
+    T statValue = doIO(reporter, name, longValue);
     long tEnd = System.currentTimeMillis();
     long execTime = tEnd - tStart;
     collectStats(output, name, execTime, statValue);

Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=951238&r1=951237&r2=951238&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/fs/TestDFSIO.java Fri Jun  4 02:08:30 2010
@@ -18,19 +18,31 @@
 
 package org.apache.hadoop.fs;
 
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.util.Date;
 import java.util.StringTokenizer;
 
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Distributed i/o benchmark.
@@ -59,23 +71,91 @@ import org.apache.hadoop.conf.*;
  * <li>standard deviation of i/o rate </li>
  * </ul>
  */
-public class TestDFSIO extends TestCase {
+public class TestDFSIO extends TestCase implements Tool {
   // Constants
+  private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
   private static final int TEST_TYPE_READ = 0;
   private static final int TEST_TYPE_WRITE = 1;
   private static final int TEST_TYPE_CLEANUP = 2;
+  private static final int TEST_TYPE_APPEND = 3;
   private static final int DEFAULT_BUFFER_SIZE = 1000000;
   private static final String BASE_FILE_NAME = "test_io_";
   private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
-  
-  private static final Log LOG = FileInputFormat.LOG;
-  private static Configuration fsConfig = new Configuration();
-  private static final long MEGA = 0x100000;
-  private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
-  private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
-  private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
-  private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
-  private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
+  private static final long MEGA = ByteMultiple.MB.value();
+  private static final String USAGE =
+                            "Usage: " + TestDFSIO.class.getSimpleName() +
+                            " [genericOptions]" +
+                            " -read | -write | -append | -clean [-nrFiles N]" +
+                            " [-fileSize Size[B|KB|MB|GB|TB]]" +
+                            " [-resFile resultFileName] [-bufferSize Bytes]" +
+                            " [-rootDir]";
+
+  private Configuration config;
+
+  static{
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
+
+  static enum ByteMultiple {
+    B(1L),
+    KB(0x400L),
+    MB(0x100000L),
+    GB(0x40000000L),
+    TB(0x10000000000L);
+
+    private long multiplier;
+
+    private ByteMultiple(long mult) {
+      multiplier = mult;
+    }
+
+    long value() {
+      return multiplier;
+    }
+
+    static ByteMultiple parseString(String sMultiple) {
+      if(sMultiple == null || sMultiple.isEmpty()) // MB by default
+        return MB;
+      String sMU = sMultiple.toUpperCase();
+      if(B.name().toUpperCase().endsWith(sMU))
+        return B;
+      if(KB.name().toUpperCase().endsWith(sMU))
+        return KB;
+      if(MB.name().toUpperCase().endsWith(sMU))
+        return MB;
+      if(GB.name().toUpperCase().endsWith(sMU))
+        return GB;
+      if(TB.name().toUpperCase().endsWith(sMU))
+        return TB;
+      throw new IllegalArgumentException("Unsupported ByteMultiple "+sMultiple);
+    }
+  }
+
+  public TestDFSIO() {
+    this.config = new Configuration();
+  }
+
+  private static String getBaseDir(Configuration conf) {
+    return conf.get("test.build.data","/benchmarks/TestDFSIO");
+  }
+  private static Path getControlDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_control");
+  }
+  private static Path getWriteDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_write");
+  }
+  private static Path getReadDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_read");
+  }
+  private static Path getAppendDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_append");
+  }
+  private static Path getDataDir(Configuration conf) {
+    return new Path(getBaseDir(conf), "io_data");
+  }
 
   /**
    * Run the test with default parameters.
@@ -83,7 +163,8 @@ public class TestDFSIO extends TestCase 
    * @throws Exception
    */
   public void testIOs() throws Exception {
-    testIOs(10, 10);
+    TestDFSIO bench = new TestDFSIO();
+    bench.testIOs(1, 4);
   }
 
   /**
@@ -93,35 +174,54 @@ public class TestDFSIO extends TestCase 
    * @param nrFiles number of files
    * @throws IOException
    */
-  public static void testIOs(int fileSize, int nrFiles)
+  public void testIOs(int fileSize, int nrFiles)
     throws IOException {
+    config.setBoolean("dfs.support.append", true);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(config, 2, true, null);
+      FileSystem fs = cluster.getFileSystem();
 
-    FileSystem fs = FileSystem.get(fsConfig);
+      createControlFile(fs, fileSize, nrFiles);
+      long tStart = System.currentTimeMillis();
+      writeTest(fs);
+      long execTime = System.currentTimeMillis() - tStart;
+      analyzeResult(fs, TEST_TYPE_WRITE, execTime, DEFAULT_RES_FILE_NAME);
 
-    createControlFile(fs, fileSize, nrFiles);
-    writeTest(fs);
-    readTest(fs);
-    cleanup(fs);
+      tStart = System.currentTimeMillis();
+      readTest(fs);
+      execTime = System.currentTimeMillis() - tStart;
+      analyzeResult(fs, TEST_TYPE_READ, execTime, DEFAULT_RES_FILE_NAME);
+
+      tStart = System.currentTimeMillis();
+      appendTest(fs);
+      execTime = System.currentTimeMillis() - tStart;
+      analyzeResult(fs, TEST_TYPE_APPEND, execTime, DEFAULT_RES_FILE_NAME);
+
+      cleanup(fs);
+    } finally {
+      if(cluster != null) cluster.shutdown();
+    }
   }
 
-  private static void createControlFile(
-                                        FileSystem fs,
-                                        int fileSize, // in MB 
-                                        int nrFiles
-                                        ) throws IOException {
-    LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
+  private void createControlFile(FileSystem fs,
+                                  long fileSize, // in bytes
+                                  int nrFiles
+                                ) throws IOException {
+    LOG.info("creating control file: "+fileSize+" bytes, "+nrFiles+" files");
 
-    fs.delete(CONTROL_DIR, true);
+    Path controlDir = getControlDir(config);
+    fs.delete(controlDir, true);
 
     for(int i=0; i < nrFiles; i++) {
       String name = getFileName(i);
-      Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
+      Path controlFile = new Path(controlDir, "in_file_" + name);
       SequenceFile.Writer writer = null;
       try {
-        writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
-                                           UTF8.class, LongWritable.class,
+        writer = SequenceFile.createWriter(fs, config, controlFile,
+                                           Text.class, LongWritable.class,
                                            CompressionType.NONE);
-        writer.append(new UTF8(name), new LongWritable(fileSize));
+        writer.append(new Text(name), new LongWritable(fileSize));
       } catch(Exception e) {
         throw new IOException(e.getLocalizedMessage());
       } finally {
@@ -149,48 +249,51 @@ public class TestDFSIO extends TestCase 
    * <li>i/o rate squared</li>
    * </ul>
    */
-  private abstract static class IOStatMapper extends IOMapperBase {
+  private abstract static class IOStatMapper<T> extends IOMapperBase<T> {
     IOStatMapper() { 
-      super(fsConfig);
     }
     
-    void collectStats(OutputCollector<UTF8, UTF8> output, 
+    void collectStats(OutputCollector<Text, Text> output, 
                       String name,
                       long execTime, 
-                      Object objSize) throws IOException {
-      long totalSize = ((Long)objSize).longValue();
+                      Long objSize) throws IOException {
+      long totalSize = objSize.longValue();
       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
       LOG.info("Number of bytes processed = " + totalSize);
       LOG.info("Exec time = " + execTime);
       LOG.info("IO rate = " + ioRateMbSec);
       
-      output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
-      output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
-      output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
-      output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
-      output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+          new Text(String.valueOf(1)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+          new Text(String.valueOf(totalSize)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+          new Text(String.valueOf(execTime)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+          new Text(String.valueOf(ioRateMbSec*1000)));
+      output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+          new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
     }
   }
 
   /**
    * Write mapper class.
    */
-  public static class WriteMapper extends IOStatMapper {
+  public static class WriteMapper extends IOStatMapper<Long> {
 
     public WriteMapper() { 
-      super(); 
       for(int i=0; i < bufferSize; i++)
         buffer[i] = (byte)('0' + i % 50);
     }
 
-    public Object doIO(Reporter reporter, 
+    @Override
+    public Long doIO(Reporter reporter, 
                        String name, 
-                       long totalSize 
-                       ) throws IOException {
+                       long totalSize // in bytes
+                     ) throws IOException {
       // create file
-      totalSize *= MEGA;
       OutputStream out;
-      out = fs.create(new Path(DATA_DIR, name), true, bufferSize);
+      out = fs.create(new Path(getDataDir(getConf()), name), true, bufferSize);
       
       try {
         // write to the file
@@ -205,57 +308,98 @@ public class TestDFSIO extends TestCase 
       } finally {
         out.close();
       }
-      return new Long(totalSize);
+      return Long.valueOf(totalSize);
     }
   }
 
-  private static void writeTest(FileSystem fs)
-    throws IOException {
-
-    fs.delete(DATA_DIR, true);
-    fs.delete(WRITE_DIR, true);
+  private void writeTest(FileSystem fs) throws IOException {
+    Path writeDir = getWriteDir(config);
+    fs.delete(getDataDir(config), true);
+    fs.delete(writeDir, true);
     
-    runIOTest(WriteMapper.class, WRITE_DIR);
+    runIOTest(WriteMapper.class, writeDir);
   }
   
-  private static void runIOTest( Class<? extends Mapper> mapperClass, 
-                                 Path outputDir
-                                 ) throws IOException {
-    JobConf job = new JobConf(fsConfig, TestDFSIO.class);
+  @SuppressWarnings("deprecation")
+  private void runIOTest(
+          Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, 
+          Path outputDir) throws IOException {
+    JobConf job = new JobConf(config, TestDFSIO.class);
 
-    FileInputFormat.setInputPaths(job, CONTROL_DIR);
+    FileInputFormat.setInputPaths(job, getControlDir(config));
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(mapperClass);
     job.setReducerClass(AccumulatingReducer.class);
 
     FileOutputFormat.setOutputPath(job, outputDir);
-    job.setOutputKeyClass(UTF8.class);
-    job.setOutputValueClass(UTF8.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
     job.setNumReduceTasks(1);
     JobClient.runJob(job);
   }
 
   /**
+   * Append mapper class.
+   */
+  public static class AppendMapper extends IOStatMapper<Long> {
+
+    public AppendMapper() { 
+      for(int i=0; i < bufferSize; i++)
+        buffer[i] = (byte)('0' + i % 50);
+    }
+
+    public Long doIO(Reporter reporter, 
+                       String name, 
+                       long totalSize // in bytes
+                     ) throws IOException {
+      // create file
+      OutputStream out;
+      out = fs.append(new Path(getDataDir(getConf()), name), bufferSize);
+      
+      try {
+        // write to the file
+        long nrRemaining;
+        for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
+          int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 
+          out.write(buffer, 0, curSize);
+          reporter.setStatus("writing " + name + "@" + 
+                             (totalSize - nrRemaining) + "/" + totalSize 
+                             + " ::host = " + hostName);
+        }
+      } finally {
+        out.close();
+      }
+      return Long.valueOf(totalSize);
+    }
+  }
+
+  private void appendTest(FileSystem fs) throws IOException {
+    Path appendDir = getAppendDir(config);
+    fs.delete(appendDir, true);
+    runIOTest(AppendMapper.class, appendDir);
+  }
+
+  /**
    * Read mapper class.
    */
-  public static class ReadMapper extends IOStatMapper {
+  public static class ReadMapper extends IOStatMapper<Long> {
 
     public ReadMapper() { 
-      super(); 
     }
 
-    public Object doIO(Reporter reporter, 
+    public Long doIO(Reporter reporter, 
                        String name, 
-                       long totalSize 
-                       ) throws IOException {
-      totalSize *= MEGA;
+                       long totalSize // in bytes
+                     ) throws IOException {
       // open file
-      DataInputStream in = fs.open(new Path(DATA_DIR, name));
+      DataInputStream in = fs.open(new Path(getDataDir(getConf()), name));
+      long actualSize = 0;
       try {
-        long actualSize = 0;
-        for(int curSize = bufferSize; curSize == bufferSize;) {
+        for(int curSize = bufferSize;
+                curSize == bufferSize && actualSize < totalSize;) {
           curSize = in.read(buffer, 0, bufferSize);
+          if(curSize < 0) break;
           actualSize += curSize;
           reporter.setStatus("reading " + name + "@" + 
                              actualSize + "/" + totalSize 
@@ -264,55 +408,73 @@ public class TestDFSIO extends TestCase 
       } finally {
         in.close();
       }
-      return new Long(totalSize);
+      return Long.valueOf(actualSize);
     }
   }
 
-  private static void readTest(FileSystem fs) throws IOException {
-    fs.delete(READ_DIR, true);
-    runIOTest(ReadMapper.class, READ_DIR);
+  private void readTest(FileSystem fs) throws IOException {
+    Path readDir = getReadDir(config);
+    fs.delete(readDir, true);
+    runIOTest(ReadMapper.class, readDir);
   }
 
-  private static void sequentialTest(
-                                     FileSystem fs, 
-                                     int testType, 
-                                     int fileSize, 
-                                     int nrFiles
-                                     ) throws Exception {
-    IOStatMapper ioer = null;
+  private void sequentialTest(FileSystem fs, 
+                              int testType, 
+                              long fileSize, // in bytes
+                              int nrFiles
+                             ) throws IOException {
+    IOStatMapper<Long> ioer = null;
     if (testType == TEST_TYPE_READ)
       ioer = new ReadMapper();
     else if (testType == TEST_TYPE_WRITE)
       ioer = new WriteMapper();
+    else if (testType == TEST_TYPE_APPEND)
+      ioer = new AppendMapper();
     else
       return;
     for(int i=0; i < nrFiles; i++)
       ioer.doIO(Reporter.NULL,
                 BASE_FILE_NAME+Integer.toString(i), 
-                MEGA*fileSize);
+                fileSize);
   }
 
   public static void main(String[] args) {
+    TestDFSIO bench = new TestDFSIO();
+    int res = -1;
+    try {
+      res = ToolRunner.run(bench, args);
+    } catch(Exception e) {
+      System.err.print(StringUtils.stringifyException(e));
+      res = -2;
+    }
+    if(res == -1)
+      System.err.print(USAGE);
+    System.exit(res);
+  }
+
+  @Override // Tool
+  public int run(String[] args) throws IOException {
     int testType = TEST_TYPE_READ;
     int bufferSize = DEFAULT_BUFFER_SIZE;
-    int fileSize = 1;
+    long fileSize = 1*MEGA;
     int nrFiles = 1;
     String resFileName = DEFAULT_RES_FILE_NAME;
     boolean isSequential = false;
+    String version = TestDFSIO.class.getSimpleName() + ".0.0.6";
 
-    String version="TestFDSIO.0.0.4";
-    String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
-    
-    System.out.println(version);
+    LOG.info(version);
     if (args.length == 0) {
-      System.err.println(usage);
-      System.exit(-1);
+      System.err.println("Missing arguments.");
+      return -1;
     }
+
     for (int i = 0; i < args.length; i++) {       // parse command line
       if (args[i].startsWith("-read")) {
         testType = TEST_TYPE_READ;
       } else if (args[i].equals("-write")) {
         testType = TEST_TYPE_WRITE;
+      } else if (args[i].equals("-append")) {
+        testType = TEST_TYPE_APPEND;
       } else if (args[i].equals("-clean")) {
         testType = TEST_TYPE_CLEANUP;
       } else if (args[i].startsWith("-seq")) {
@@ -320,83 +482,120 @@ public class TestDFSIO extends TestCase 
       } else if (args[i].equals("-nrFiles")) {
         nrFiles = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-fileSize")) {
-        fileSize = Integer.parseInt(args[++i]);
+        fileSize = parseSize(args[++i]);
       } else if (args[i].equals("-bufferSize")) {
         bufferSize = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-resFile")) {
         resFileName = args[++i];
+      } else {
+        System.err.println("Illegal argument: " + args[i]);
+        return -1;
       }
     }
 
     LOG.info("nrFiles = " + nrFiles);
-    LOG.info("fileSize (MB) = " + fileSize);
+    LOG.info("fileSize (MB) = " + toMB(fileSize));
     LOG.info("bufferSize = " + bufferSize);
-  
-    try {
-      fsConfig.setInt("test.io.file.buffer.size", bufferSize);
-      FileSystem fs = FileSystem.get(fsConfig);
+    LOG.info("baseDir = " + getBaseDir(config));
 
-      if (isSequential) {
-        long tStart = System.currentTimeMillis();
-        sequentialTest(fs, testType, fileSize, nrFiles);
-        long execTime = System.currentTimeMillis() - tStart;
-        String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
-        LOG.info(resultLine);
-        return;
-      }
-      if (testType == TEST_TYPE_CLEANUP) {
-        cleanup(fs);
-        return;
-      }
-      createControlFile(fs, fileSize, nrFiles);
+    config.setInt("test.io.file.buffer.size", bufferSize);
+    config.setBoolean("dfs.support.append", true);
+    FileSystem fs = FileSystem.get(config);
+
+    if (isSequential) {
       long tStart = System.currentTimeMillis();
-      if (testType == TEST_TYPE_WRITE)
-        writeTest(fs);
-      if (testType == TEST_TYPE_READ)
-        readTest(fs);
+      sequentialTest(fs, testType, fileSize, nrFiles);
       long execTime = System.currentTimeMillis() - tStart;
-    
-      analyzeResult(fs, testType, execTime, resFileName);
-    } catch(Exception e) {
-      System.err.print(StringUtils.stringifyException(e));
-      System.exit(-1);
+      String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
+      LOG.info(resultLine);
+      return 0;
+    }
+    if (testType == TEST_TYPE_CLEANUP) {
+      cleanup(fs);
+      return 0;
     }
-  }
+    createControlFile(fs, fileSize, nrFiles);
+    long tStart = System.currentTimeMillis();
+    if (testType == TEST_TYPE_WRITE)
+      writeTest(fs);
+    if (testType == TEST_TYPE_READ)
+      readTest(fs);
+    if (testType == TEST_TYPE_APPEND)
+      appendTest(fs);
+    long execTime = System.currentTimeMillis() - tStart;
   
-  private static void analyzeResult( FileSystem fs, 
-                                     int testType,
-                                     long execTime,
-                                     String resFileName
-                                     ) throws IOException {
+    analyzeResult(fs, testType, execTime, resFileName);
+    return 0;
+  }
+
+  @Override // Configurable
+  public Configuration getConf() {
+    return this.config;
+  }
+
+  @Override // Configurable
+  public void setConf(Configuration conf) {
+    this.config = conf;
+  }
+
+  /**
+   * Returns size in bytes.
+   * 
+   * @param arg = {d}[B|KB|MB|GB|TB]
+   * @return
+   */
+  static long parseSize(String arg) {
+    String[] args = arg.split("\\D", 2);  // get digits
+    assert args.length <= 2;
+    long fileSize = Long.parseLong(args[0]);
+    String bytesMult = arg.substring(args[0].length()); // get byte multiple
+    return fileSize * ByteMultiple.parseString(bytesMult).value();
+  }
+
+  static float toMB(long bytes) {
+    return ((float)bytes)/MEGA;
+  }
+
+  private void analyzeResult(	FileSystem fs,
+                              int testType,
+                              long execTime,
+                              String resFileName
+                            ) throws IOException {
     Path reduceFile;
     if (testType == TEST_TYPE_WRITE)
-      reduceFile = new Path(WRITE_DIR, "part-00000");
-    else
-      reduceFile = new Path(READ_DIR, "part-00000");
-    DataInputStream in;
-    in = new DataInputStream(fs.open(reduceFile));
-  
-    BufferedReader lines;
-    lines = new BufferedReader(new InputStreamReader(in));
+      reduceFile = new Path(getWriteDir(config), "part-00000");
+    else if (testType == TEST_TYPE_APPEND)
+      reduceFile = new Path(getAppendDir(config), "part-00000");
+    else // if (testType == TEST_TYPE_READ)
+      reduceFile = new Path(getReadDir(config), "part-00000");
     long tasks = 0;
     long size = 0;
     long time = 0;
     float rate = 0;
     float sqrate = 0;
-    String line;
-    while((line = lines.readLine()) != null) {
-      StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
-      String attr = tokens.nextToken(); 
-      if (attr.endsWith(":tasks"))
-        tasks = Long.parseLong(tokens.nextToken());
-      else if (attr.endsWith(":size"))
-        size = Long.parseLong(tokens.nextToken());
-      else if (attr.endsWith(":time"))
-        time = Long.parseLong(tokens.nextToken());
-      else if (attr.endsWith(":rate"))
-        rate = Float.parseFloat(tokens.nextToken());
-      else if (attr.endsWith(":sqrate"))
-        sqrate = Float.parseFloat(tokens.nextToken());
+    DataInputStream in = null;
+    BufferedReader lines = null;
+    try {
+      in = new DataInputStream(fs.open(reduceFile));
+      lines = new BufferedReader(new InputStreamReader(in));
+      String line;
+      while((line = lines.readLine()) != null) {
+        StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%");
+        String attr = tokens.nextToken(); 
+        if (attr.endsWith(":tasks"))
+          tasks = Long.parseLong(tokens.nextToken());
+        else if (attr.endsWith(":size"))
+          size = Long.parseLong(tokens.nextToken());
+        else if (attr.endsWith(":time"))
+          time = Long.parseLong(tokens.nextToken());
+        else if (attr.endsWith(":rate"))
+          rate = Float.parseFloat(tokens.nextToken());
+        else if (attr.endsWith(":sqrate"))
+          sqrate = Float.parseFloat(tokens.nextToken());
+      }
+    } finally {
+      if(in != null) in.close();
+      if(lines != null) lines.close();
     }
     
     double med = rate / 1000 / tasks;
@@ -404,27 +603,32 @@ public class TestDFSIO extends TestCase 
     String resultLines[] = {
       "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
                                     (testType == TEST_TYPE_READ) ? "read" : 
+                                    (testType == TEST_TYPE_APPEND) ? "append" : 
                                     "unknown"),
       "           Date & time: " + new Date(System.currentTimeMillis()),
       "       Number of files: " + tasks,
-      "Total MBytes processed: " + size/MEGA,
+      "Total MBytes processed: " + toMB(size),
       "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
       "Average IO rate mb/sec: " + med,
       " IO rate std deviation: " + stdDev,
       "    Test exec time sec: " + (float)execTime / 1000,
       "" };
 
-    PrintStream res = new PrintStream(
-                                      new FileOutputStream(
-                                                           new File(resFileName), true)); 
-    for(int i = 0; i < resultLines.length; i++) {
-      LOG.info(resultLines[i]);
-      res.println(resultLines[i]);
+    PrintStream res = null;
+    try {
+      res = new PrintStream(new FileOutputStream(new File(resFileName), true)); 
+      for(int i = 0; i < resultLines.length; i++) {
+        LOG.info(resultLines[i]);
+        res.println(resultLines[i]);
+      }
+    } finally {
+      if(res != null) res.close();
     }
   }
 
-  private static void cleanup(FileSystem fs) throws IOException {
+  private void cleanup(FileSystem fs)
+  throws IOException {
     LOG.info("Cleaning up test files");
-    fs.delete(new Path(TEST_ROOT_DIR), true);
+    fs.delete(new Path(getBaseDir(config)), true);
   }
 }