You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/11/09 03:47:34 UTC

svn commit: r1540245 - /hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java

Author: tedyu
Date: Sat Nov  9 02:47:33 2013
New Revision: 1540245

URL: http://svn.apache.org/r1540245
Log:
HBASE-9808 org.apache.hadoop.hbase.rest.PerformanceEvaluation is out of sync with org.apache.hadoop.hbase.PerformanceEvaluation


Modified:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java?rev=1540245&r1=1540244&r2=1540245&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java Sat Nov  9 02:47:33 2013
@@ -22,21 +22,22 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.lang.reflect.Constructor;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
-import java.util.Arrays;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.lang.reflect.Constructor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,21 +46,28 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.rest.client.Client;
 import org.apache.hadoop.hbase.rest.client.Cluster;
 import org.apache.hadoop.hbase.rest.client.RemoteAdmin;
-import org.apache.hadoop.hbase.rest.client.RemoteHTable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Hash;
 import org.apache.hadoop.hbase.util.MurmurHash;
@@ -78,6 +86,8 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Script used evaluating Stargate performance and scalability.  Runs a SG
@@ -86,32 +96,30 @@ import org.apache.hadoop.util.LineReader
  * command-line which test to run and how many clients are participating in
  * this experiment. Run <code>java PerformanceEvaluation --help</code> to
  * obtain usage.
- * 
+ *
  * <p>This class sets up and runs the evaluation programs described in
  * Section 7, <i>Performance Evaluation</i>, of the <a
  * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
  * paper, pages 8-10.
- * 
+ *
  * <p>If number of clients > 1, we start up a MapReduce job. Each map task
  * runs an individual client. Each client does about 1GB of data.
  */
-public class PerformanceEvaluation  {
+public class PerformanceEvaluation extends Configured implements Tool {
   protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
-  
+
+  private static final int DEFAULT_ROW_PREFIX_LENGTH = 16;
   private static final int ROW_LENGTH = 1000;
+  private static final int TAG_LENGTH = 256;
   private static final int ONE_GB = 1024 * 1024 * 1000;
   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
-  
-  public static final byte [] TABLE_NAME = Bytes.toBytes("TestTable");
+
+  public static final TableName TABLE_NAME = TableName.valueOf("TestTable");
   public static final byte [] FAMILY_NAME = Bytes.toBytes("info");
   public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
+  private TableName tableName = TABLE_NAME;
 
-  protected static final HTableDescriptor TABLE_DESCRIPTOR;
-  static {
-    TABLE_DESCRIPTOR = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
-    TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
-  }
-
+  protected HTableDescriptor TABLE_DESCRIPTOR;
   protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
   protected static Cluster cluster = new Cluster();
 
@@ -119,18 +127,30 @@ public class PerformanceEvaluation  {
   private boolean nomapred = false;
   private int N = 1;
   private int R = ROWS_PER_GB;
-  private int B = 100;
+  private Compression.Algorithm compression = Compression.Algorithm.NONE;
+  private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
+  private boolean flushCommits = true;
+  private boolean writeToWAL = true;
+  private boolean inMemoryCF = false;
+  private int presplitRegions = 0;
+  private boolean useTags = false;
+  private int noOfTags = 1;
+  private HConnection connection;
 
   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
   /**
    * Regex to parse lines in input file passed to mapreduce task.
    */
   public static final Pattern LINE_PATTERN =
-    Pattern.compile("startRow=(\\d+),\\s+" +
-        "perClientRunRows=(\\d+),\\s+" +
-        "totalRows=(\\d+),\\s+" + 
-        "clients=(\\d+),\\s+" + 
-        "rowsPerPut=(\\d+)");
+      Pattern.compile("tableName=(\\w+),\\s+" +
+          "startRow=(\\d+),\\s+" +
+          "perClientRunRows=(\\d+),\\s+" +
+          "totalRows=(\\d+),\\s+" +
+          "clients=(\\d+),\\s+" +
+          "flushCommits=(\\w+),\\s+" +
+          "writeToWAL=(\\w+),\\s+" +
+          "useTags=(\\w+),\\s+" +
+          "noOfTags=(\\d+)");
 
   /**
    * Enum for map metrics.  Keep it out here rather than inside in the Map
@@ -173,13 +193,13 @@ public class PerformanceEvaluation  {
         "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
   }
 
-  protected void addCommandDescriptor(Class<? extends Test> cmdClass, 
+  protected void addCommandDescriptor(Class<? extends Test> cmdClass,
       String name, String description) {
-    CmdDescriptor cmdDescriptor = 
+    CmdDescriptor cmdDescriptor =
       new CmdDescriptor(cmdClass, name, description);
     commands.put(name, cmdDescriptor);
   }
-  
+
   /**
    * Implementations can have their status set.
    */
@@ -191,7 +211,7 @@ public class PerformanceEvaluation  {
      */
     void setStatus(final String msg) throws IOException;
   }
-  
+
   /**
    *  This class works as the InputSplit of Performance Evaluation
    *  MapReduce InputFormat, and the Record Value of RecordReader. 
@@ -199,81 +219,113 @@ public class PerformanceEvaluation  {
    *  the record value is the PeInputSplit itself.
    */
   public static class PeInputSplit extends InputSplit implements Writable {
+    private TableName tableName = TABLE_NAME;
     private int startRow = 0;
     private int rows = 0;
     private int totalRows = 0;
     private int clients = 0;
-    private int rowsPerPut = 1;
+    private boolean flushCommits = false;
+    private boolean writeToWAL = true;
+    private boolean useTags = false;
+    private int noOfTags = 0;
 
     public PeInputSplit() {
-      this.startRow = 0;
-      this.rows = 0;
-      this.totalRows = 0;
-      this.clients = 0;
-      this.rowsPerPut = 1;
     }
 
-    public PeInputSplit(int startRow, int rows, int totalRows, int clients,
-        int rowsPerPut) {
+    public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows, int clients,
+        boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags) {
+      this.tableName = tableName;
       this.startRow = startRow;
       this.rows = rows;
       this.totalRows = totalRows;
       this.clients = clients;
-      this.rowsPerPut = 1;
+      this.flushCommits = flushCommits;
+      this.writeToWAL = writeToWAL;
+      this.useTags = useTags;
+      this.noOfTags = noOfTags;
     }
-    
+
     @Override
     public void readFields(DataInput in) throws IOException {
+      int tableNameLen = in.readInt();
+      byte[] name = new byte[tableNameLen];
+      in.readFully(name);
+      this.tableName = TableName.valueOf(name);
       this.startRow = in.readInt();
       this.rows = in.readInt();
       this.totalRows = in.readInt();
       this.clients = in.readInt();
-      this.rowsPerPut = in.readInt();
+      this.flushCommits = in.readBoolean();
+      this.writeToWAL = in.readBoolean();
+      this.useTags = in.readBoolean();
+      this.noOfTags = in.readInt();
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
+      byte[] name = this.tableName.toBytes();
+      out.writeInt(name.length);
+      out.write(name);
       out.writeInt(startRow);
       out.writeInt(rows);
       out.writeInt(totalRows);
       out.writeInt(clients);
-      out.writeInt(rowsPerPut);
+      out.writeBoolean(flushCommits);
+      out.writeBoolean(writeToWAL);
+      out.writeBoolean(useTags);
+      out.writeInt(noOfTags);
     }
-    
+
     @Override
     public long getLength() throws IOException, InterruptedException {
       return 0;
     }
-  
+
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
       return new String[0];
     }
-    
+
     public int getStartRow() {
       return startRow;
     }
-    
+
+    public TableName getTableName() {
+      return tableName;
+    }
+
     public int getRows() {
       return rows;
     }
-    
+
     public int getTotalRows() {
       return totalRows;
     }
-    
+
     public int getClients() {
       return clients;
     }
 
-    public int getRowsPerPut() {
-      return rowsPerPut;
+    public boolean isFlushCommits() {
+      return flushCommits;
+    }
+
+    public boolean isWriteToWAL() {
+      return writeToWAL;
+    }
+
+    public boolean isUseTags() {
+      return useTags;
+    }
+
+    public int getNoOfTags() {
+      return noOfTags;
     }
   }
 
   /**
    *  InputFormat of Performance Evaluation MapReduce job.
-   *  It extends from FileInputFormat, want to use it's methods such as setInputPaths(). 
+   *  It extends from FileInputFormat, want to use it's methods such as setInputPaths().
    */
   public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
 
@@ -281,8 +333,11 @@ public class PerformanceEvaluation  {
     public List<InputSplit> getSplits(JobContext job) throws IOException {
       // generate splits
       List<InputSplit> splitList = new ArrayList<InputSplit>();
-      
+
       for (FileStatus file: listStatus(job)) {
+        if (file.isDir()) {
+          continue;
+        }
         Path path = file.getPath();
         FileSystem fs = path.getFileSystem(job.getConfiguration());
         FSDataInputStream fileIn = fs.open(path);
@@ -296,73 +351,82 @@ public class PerformanceEvaluation  {
           }
           Matcher m = LINE_PATTERN.matcher(lineText.toString());
           if((m != null) && m.matches()) {
-            int startRow = Integer.parseInt(m.group(1));
-            int rows = Integer.parseInt(m.group(2));
-            int totalRows = Integer.parseInt(m.group(3));
-            int clients = Integer.parseInt(m.group(4));
-            int rowsPerPut = Integer.parseInt(m.group(5));
-
-            LOG.debug("split["+ splitList.size() + "] " + 
-                     " startRow=" + startRow +
-                     " rows=" + rows +
-                     " totalRows=" + totalRows +
-                     " clients=" + clients +
-                     " rowsPerPut=" + rowsPerPut);
+            TableName tableName = TableName.valueOf(m.group(1));
+            int startRow = Integer.parseInt(m.group(2));
+            int rows = Integer.parseInt(m.group(3));
+            int totalRows = Integer.parseInt(m.group(4));
+            int clients = Integer.parseInt(m.group(5));
+            boolean flushCommits = Boolean.parseBoolean(m.group(6));
+            boolean writeToWAL = Boolean.parseBoolean(m.group(7));
+            boolean useTags = Boolean.parseBoolean(m.group(8));
+            int noOfTags = Integer.parseInt(m.group(9));
+
+            LOG.debug("tableName=" + tableName +
+                      " split["+ splitList.size() + "] " +
+                      " startRow=" + startRow +
+                      " rows=" + rows +
+                      " totalRows=" + totalRows +
+                      " clients=" + clients +
+                      " flushCommits=" + flushCommits +
+                      " writeToWAL=" + writeToWAL +
+                      " useTags=" + useTags +
+                      " noOfTags=" + noOfTags);
 
             PeInputSplit newSplit =
-              new PeInputSplit(startRow, rows, totalRows, clients, rowsPerPut);
+              new PeInputSplit(tableName, startRow, rows, totalRows, clients,
+                  flushCommits, writeToWAL, useTags, noOfTags);
             splitList.add(newSplit);
           }
         }
         in.close();
       }
-      
+
       LOG.info("Total # of splits: " + splitList.size());
       return splitList;
     }
-    
+
     @Override
     public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
                             TaskAttemptContext context) {
       return new PeRecordReader();
     }
-    
+
     public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
       private boolean readOver = false;
       private PeInputSplit split = null;
       private NullWritable key = null;
       private PeInputSplit value = null;
-      
+
       @Override
-      public void initialize(InputSplit split, TaskAttemptContext context) 
+      public void initialize(InputSplit split, TaskAttemptContext context)
                   throws IOException, InterruptedException {
         this.readOver = false;
         this.split = (PeInputSplit)split;
       }
-      
+
       @Override
       public boolean nextKeyValue() throws IOException, InterruptedException {
         if(readOver) {
           return false;
         }
-        
+
         key = NullWritable.get();
         value = (PeInputSplit)split;
-        
+
         readOver = true;
         return true;
       }
-      
+
       @Override
       public NullWritable getCurrentKey() throws IOException, InterruptedException {
         return key;
       }
-      
+
       @Override
       public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
         return value;
       }
-      
+
       @Override
       public float getProgress() throws IOException, InterruptedException {
         if(readOver) {
@@ -371,18 +435,18 @@ public class PerformanceEvaluation  {
           return 0.0f;
         }
       }
-      
+
       @Override
       public void close() throws IOException {
         // do nothing
       }
     }
   }
-  
+
   /**
    * MapReduce job that runs a performance evaluation client in each map task.
    */
-  public static class EvaluationMapTask 
+  public static class EvaluationMapTask
       extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
 
     /** configuration parameter name that contains the command */
@@ -419,18 +483,22 @@ public class PerformanceEvaluation  {
       return clazz;
     }
 
-    protected void map(NullWritable key, PeInputSplit value, final Context context) 
+    protected void map(NullWritable key, PeInputSplit value, final Context context)
            throws IOException, InterruptedException {
-      
+
       Status status = new Status() {
         public void setStatus(String msg) {
-           context.setStatus(msg); 
+           context.setStatus(msg);
         }
       };
-      
+
       // Evaluation task
+      pe.tableName = value.getTableName();
       long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
-        value.getRows(), value.getTotalRows(), value.getRowsPerPut(), status);
+        value.getRows(), value.getTotalRows(),
+        value.isFlushCommits(), value.isWriteToWAL(),
+        value.isUseTags(), value.getNoOfTags(),
+        HConnectionManager.createConnection(context.getConfiguration()), status);
       // Collect how much time the thing took. Report as map output and
       // to the ELAPSED_TIME counter.
       context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
@@ -439,27 +507,71 @@ public class PerformanceEvaluation  {
       context.progress();
     }
   }
-  
+
   /*
    * If table does not already exist, create.
    * @param c Client to use checking.
    * @return True if we created the table.
    * @throws IOException
    */
-  private boolean checkTable() throws IOException {
+  private boolean checkTable(RemoteAdmin admin) throws IOException {
     HTableDescriptor tableDescriptor = getTableDescriptor();
-    RemoteAdmin admin = new RemoteAdmin(new Client(cluster), conf);
-    if (!admin.isTableAvailable(tableDescriptor.getTableName().getName())) {
+    if (this.presplitRegions > 0) {
+      // presplit requested
+      if (admin.isTableAvailable(tableDescriptor.getTableName().getName())) {
+        admin.deleteTable(tableDescriptor.getTableName().getName());
+      }
+
+      byte[][] splits = getSplits();
+      for (int i=0; i < splits.length; i++) {
+        LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+      }
       admin.createTable(tableDescriptor);
-      return true;
+      LOG.info ("Table created with " + this.presplitRegions + " splits");
+    } else {
+      boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName());
+      if (!tableExists) {
+        admin.createTable(tableDescriptor);
+        LOG.info("Table " + tableDescriptor + " created");
+      }
     }
-    return false;
+    boolean tableExists = admin.isTableAvailable(tableDescriptor.getTableName().getName());
+    return tableExists;
   }
 
   protected HTableDescriptor getTableDescriptor() {
+    if (TABLE_DESCRIPTOR == null) {
+      TABLE_DESCRIPTOR = new HTableDescriptor(tableName);
+      HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
+      family.setDataBlockEncoding(blockEncoding);
+      family.setCompressionType(compression);
+      if (inMemoryCF) {
+        family.setInMemory(true);
+      }
+      TABLE_DESCRIPTOR.addFamily(family);
+    }
     return TABLE_DESCRIPTOR;
   }
 
+  /**
+   * Generates splits based on total number of rows and specified split regions
+   *
+   * @return splits : array of byte []
+   */
+  protected  byte[][] getSplits() {
+    if (this.presplitRegions == 0)
+      return new byte [0][];
+
+    int numSplitPoints = presplitRegions - 1;
+    byte[][] splits = new byte[numSplitPoints][];
+    int jump = this.R  / this.presplitRegions;
+    for (int i=0; i < numSplitPoints; i++) {
+      int rowkey = jump * (1 + i);
+      splits[i] = format(rowkey);
+    }
+    return splits;
+  }
+
   /*
    * We're to run multiple clients concurrently.  Setup a mapreduce job.  Run
    * one map per client.  Then run a single reduce to sum the elapsed times.
@@ -468,36 +580,59 @@ public class PerformanceEvaluation  {
    */
   private void runNIsMoreThanOne(final Class<? extends Test> cmd)
   throws IOException, InterruptedException, ClassNotFoundException {
-    checkTable();
+    RemoteAdmin remoteAdmin = new RemoteAdmin(new Client(cluster), getConf());
+    checkTable(remoteAdmin);
     if (nomapred) {
       doMultipleClients(cmd);
     } else {
       doMapReduce(cmd);
     }
   }
-  
+
   /*
    * Run all clients in this vm each to its own thread.
    * @param cmd Command to run.
    * @throws IOException
    */
   private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
-    final List<Thread> threads = new ArrayList<Thread>(N);
+    final List<Thread> threads = new ArrayList<Thread>(this.N);
+    final long[] timings = new long[this.N];
     final int perClientRows = R/N;
-    for (int i = 0; i < N; i++) {
-      Thread t = new Thread (Integer.toString(i)) {
+    final TableName tableName = this.tableName;
+    final DataBlockEncoding encoding = this.blockEncoding;
+    final boolean flushCommits = this.flushCommits;
+    final Compression.Algorithm compression = this.compression;
+    final boolean writeToWal = this.writeToWAL;
+    final int preSplitRegions = this.presplitRegions;
+    final boolean useTags = this.useTags;
+    final int numTags = this.noOfTags;
+    final HConnection connection = HConnectionManager.createConnection(getConf());
+    for (int i = 0; i < this.N; i++) {
+      final int index = i;
+      Thread t = new Thread ("TestClient-" + i) {
         @Override
         public void run() {
           super.run();
-          PerformanceEvaluation pe = new PerformanceEvaluation(conf);
-          int index = Integer.parseInt(getName());
+          PerformanceEvaluation pe = new PerformanceEvaluation(getConf());
+          pe.tableName = tableName;
+          pe.blockEncoding = encoding;
+          pe.flushCommits = flushCommits;
+          pe.compression = compression;
+          pe.writeToWAL = writeToWal;
+          pe.presplitRegions = preSplitRegions;
+          pe.N = N;
+          pe.connection = connection;
+          pe.useTags = useTags;
+          pe.noOfTags = numTags;
           try {
             long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
-              perClientRows, R, B, new Status() {
-                  public void setStatus(final String msg) throws IOException {
-                    LOG.info("client-" + getName() + " " + msg);
-                  }
-                });
+                perClientRows, R,
+                 flushCommits, writeToWAL, useTags, noOfTags, connection, new Status() {
+                   public void setStatus(final String msg) throws IOException {
+                     LOG.info("client-" + getName() + " " + msg);
+                   }
+                 });
+            timings[index] = elapsedTime;
             LOG.info("Finished " + getName() + " in " + elapsedTime +
               "ms writing " + perClientRows + " rows");
           } catch (IOException e) {
@@ -519,8 +654,20 @@ public class PerformanceEvaluation  {
         }
       }
     }
+    final String test = cmd.getSimpleName();
+    LOG.info("[" + test + "] Summary of timings (ms): "
+             + Arrays.toString(timings));
+    Arrays.sort(timings);
+    long total = 0;
+    for (int i = 0; i < this.N; i++) {
+      total += timings[i];
+    }
+    LOG.info("[" + test + "]"
+             + "\tMin: " + timings[0] + "ms"
+             + "\tMax: " + timings[this.N - 1] + "ms"
+             + "\tAvg: " + (total / this.N) + "ms");
   }
-  
+
   /*
    * Run a mapreduce job.  Run as many maps as asked-for clients.
    * Before we start up the job, write out an input file with instruction
@@ -530,30 +677,31 @@ public class PerformanceEvaluation  {
    */
   private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
         InterruptedException, ClassNotFoundException {
-    Path inputDir = writeInputFile(this.conf);
-    this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
-    this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
-    Job job = new Job(this.conf);
+    Configuration conf = getConf();
+    Path inputDir = writeInputFile(conf);
+    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
+    conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
+    Job job = new Job(conf);
     job.setJarByClass(PerformanceEvaluation.class);
     job.setJobName("HBase Performance Evaluation");
-    
+
     job.setInputFormatClass(PeInputFormat.class);
     PeInputFormat.setInputPaths(job, inputDir);
-    
+
     job.setOutputKeyClass(LongWritable.class);
     job.setOutputValueClass(LongWritable.class);
-    
+
     job.setMapperClass(EvaluationMapTask.class);
     job.setReducerClass(LongSumReducer.class);
-        
     job.setNumReduceTasks(1);
-    
+
     job.setOutputFormatClass(TextOutputFormat.class);
-    TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs"));
-    
+    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
+    TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.initCredentials(job);
     job.waitForCompletion(true);
   }
-  
+
   /*
    * Write input file of offsets-per-client for the mapreduce job.
    * @param c Configuration
@@ -561,27 +709,30 @@ public class PerformanceEvaluation  {
    * @throws IOException
    */
   private Path writeInputFile(final Configuration c) throws IOException {
-    FileSystem fs = FileSystem.get(c);
-    if (!fs.exists(PERF_EVAL_DIR)) {
-      fs.mkdirs(PERF_EVAL_DIR);
-    }
     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
-    Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
-    fs.mkdirs(subdir);
-    Path inputFile = new Path(subdir, "input.txt");
+    Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
+    Path inputDir = new Path(jobdir, "inputs");
+
+    FileSystem fs = FileSystem.get(c);
+    fs.mkdirs(inputDir);
+    Path inputFile = new Path(inputDir, "input.txt");
     PrintStream out = new PrintStream(fs.create(inputFile));
     // Make input random.
     Map<Integer, String> m = new TreeMap<Integer, String>();
     Hash h = MurmurHash.getInstance();
-    int perClientRows = (R / N);
+    int perClientRows = (this.R / this.N);
     try {
       for (int i = 0; i < 10; i++) {
         for (int j = 0; j < N; j++) {
-          String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
+          String s = "tableName=" + this.tableName +
+          ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
           ", perClientRunRows=" + (perClientRows / 10) +
-          ", totalRows=" + R +
-          ", clients=" + N +
-          ", rowsPerPut=" + B;
+          ", totalRows=" + this.R +
+          ", clients=" + this.N +
+          ", flushCommits=" + this.flushCommits +
+          ", writeToWAL=" + this.writeToWAL +
+          ", useTags=" + this.useTags +
+          ", noOfTags=" + this.noOfTags;
           int hash = h.hash(Bytes.toBytes(s));
           m.put(hash, s);
         }
@@ -592,7 +743,7 @@ public class PerformanceEvaluation  {
     } finally {
       out.close();
     }
-    return subdir;
+    return inputDir;
   }
 
   /**
@@ -630,18 +781,30 @@ public class PerformanceEvaluation  {
     private int startRow;
     private int perClientRunRows;
     private int totalRows;
-    private byte[] tableName;
-    private int rowsPerPut;
+    private int numClientThreads;
+    private TableName tableName;
+    private boolean flushCommits;
+    private boolean writeToWAL = true;
+    private boolean useTags = false;
+    private int noOfTags = 0;
+    private HConnection connection;
 
     TestOptions() {
     }
 
-    TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, int rowsPerPut) {
+    TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads,
+        TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags,
+        int noOfTags, HConnection connection) {
       this.startRow = startRow;
       this.perClientRunRows = perClientRunRows;
       this.totalRows = totalRows;
+      this.numClientThreads = numClientThreads;
       this.tableName = tableName;
-      this.rowsPerPut = rowsPerPut;
+      this.flushCommits = flushCommits;
+      this.writeToWAL = writeToWAL;
+      this.useTags = useTags;
+      this.noOfTags = noOfTags;
+      this.connection = connection;
     }
 
     public int getStartRow() {
@@ -656,12 +819,32 @@ public class PerformanceEvaluation  {
       return totalRows;
     }
 
-    public byte[] getTableName() {
+    public int getNumClientThreads() {
+      return numClientThreads;
+    }
+
+    public TableName getTableName() {
       return tableName;
     }
 
-    public int getRowsPerPut() {
-      return rowsPerPut;
+    public boolean isFlushCommits() {
+      return flushCommits;
+    }
+
+    public boolean isWriteToWAL() {
+      return writeToWAL;
+    }
+
+    public HConnection getConnection() {
+      return connection;
+    }
+
+    public boolean isUseTags() {
+      return this.useTags;
+    }
+
+    public int getNumTags() {
+      return this.noOfTags;
     }
   }
 
@@ -671,7 +854,7 @@ public class PerformanceEvaluation  {
    */
   static abstract class Test {
     // Below is make it so when Tests are all running in the one
-    // jvm, that they each have a differently seeded Random. 
+    // jvm, that they each have a differently seeded Random.
     private static final Random randomSeed =
       new Random(System.currentTimeMillis());
     private static long nextRandomSeed() {
@@ -682,10 +865,15 @@ public class PerformanceEvaluation  {
     protected final int startRow;
     protected final int perClientRunRows;
     protected final int totalRows;
-    protected final Status status;
-    protected byte[] tableName;
-    protected RemoteHTable table;
+    private final Status status;
+    protected TableName tableName;
+    protected HTableInterface table;
     protected volatile Configuration conf;
+    protected boolean flushCommits;
+    protected boolean writeToWAL;
+    protected boolean useTags;
+    protected int noOfTags;
+    protected HConnection connection;
 
     /**
      * Note that all subclasses of this class must provide a public contructor
@@ -700,41 +888,49 @@ public class PerformanceEvaluation  {
       this.tableName = options.getTableName();
       this.table = null;
       this.conf = conf;
+      this.flushCommits = options.isFlushCommits();
+      this.writeToWAL = options.isWriteToWAL();
+      this.useTags = options.isUseTags();
+      this.noOfTags = options.getNumTags();
+      this.connection = options.getConnection();
     }
-    
+
     protected String generateStatus(final int sr, final int i, final int lr) {
       return sr + "/" + i + "/" + lr;
     }
-    
+
     protected int getReportingPeriod() {
       int period = this.perClientRunRows / 10;
       return period == 0? this.perClientRunRows: period;
     }
-    
+
     void testSetup() throws IOException {
-      this.table = new RemoteHTable(new Client(cluster), conf, tableName);
+      this.table = connection.getTable(tableName);
+      this.table.setAutoFlush(false, true);
     }
 
     void testTakedown()  throws IOException {
-      this.table.close();
+      if (flushCommits) {
+        this.table.flushCommits();
+      }
+      table.close();
     }
-    
+
     /*
      * Run test
      * @return Elapsed time.
      * @throws IOException
      */
     long test() throws IOException {
-      long elapsedTime;
       testSetup();
-      long startTime = System.currentTimeMillis();
+      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
+      final long startTime = System.nanoTime();
       try {
         testTimed();
-        elapsedTime = System.currentTimeMillis() - startTime;
       } finally {
         testTakedown();
       }
-      return elapsedTime;
+      return (System.nanoTime() - startTime) / 1000000;
     }
 
     /**
@@ -755,8 +951,7 @@ public class PerformanceEvaluation  {
     * Test for individual row.
     * @param i Row index.
     */
-    void testRow(final int i) throws IOException {
-    }
+    abstract void testRow(final int i) throws IOException;
   }
 
   @SuppressWarnings("unused")
@@ -771,13 +966,9 @@ public class PerformanceEvaluation  {
       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
       scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
       ResultScanner s = this.table.getScanner(scan);
-      //int count = 0;
-      for (Result rr = null; (rr = s.next()) != null;) {
-        // LOG.info("" + count++ + " " + rr.toString());
-      }
       s.close();
     }
- 
+
     @Override
     protected int getReportingPeriod() {
       int period = this.perClientRunRows / 100;
@@ -890,48 +1081,40 @@ public class PerformanceEvaluation  {
     }
 
   }
-  
-  static class RandomWriteTest extends Test {
-    int rowsPerPut;
 
+  static class RandomWriteTest extends Test {
     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
-      rowsPerPut = options.getRowsPerPut();
     }
-    
+
     @Override
-    void testTimed() throws IOException {
-      int lastRow = this.startRow + this.perClientRunRows;
-      // Report on completion of 1/10th of total.
-      List<Put> puts = new ArrayList<Put>();
-      for (int i = this.startRow; i < lastRow; i += rowsPerPut) {
-        for (int j = 0; j < rowsPerPut; j++) {
-          byte [] row = getRandomRow(this.rand, this.totalRows);
-          Put put = new Put(row);
-          byte[] value = generateValue(this.rand);
-          put.add(FAMILY_NAME, QUALIFIER_NAME, value);
-          puts.add(put);
-          if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
-            status.setStatus(generateStatus(this.startRow, i, lastRow));
-          }
+    void testRow(final int i) throws IOException {
+      byte[] row = getRandomRow(this.rand, this.totalRows);
+      Put put = new Put(row);
+      byte[] value = generateData(this.rand, ROW_LENGTH);
+      if (useTags) {
+        byte[] tag = generateData(this.rand, TAG_LENGTH);
+        Tag[] tags = new Tag[noOfTags];
+        for (int n = 0; n < noOfTags; n++) {
+          Tag t = new Tag((byte) n, tag);
+          tags[n] = t;
         }
-        table.put(puts);
+        put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+      } else {
+        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
+      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+      table.put(put);
     }
   }
-  
+
   static class ScanTest extends Test {
     private ResultScanner testScanner;
 
     ScanTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
     }
-    
-    @Override
-    void testSetup() throws IOException {
-      super.testSetup();
-    }
-    
+
     @Override
     void testTakedown() throws IOException {
       if (this.testScanner != null) {
@@ -939,8 +1122,8 @@ public class PerformanceEvaluation  {
       }
       super.testTakedown();
     }
-    
-    
+
+
     @Override
     void testRow(final int i) throws IOException {
       if (this.testScanner == null) {
@@ -952,12 +1135,12 @@ public class PerformanceEvaluation  {
     }
 
   }
-  
+
   static class SequentialReadTest extends Test {
     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
     }
-    
+
     @Override
     void testRow(final int i) throws IOException {
       Get get = new Get(format(i));
@@ -966,32 +1149,30 @@ public class PerformanceEvaluation  {
     }
 
   }
-  
+
   static class SequentialWriteTest extends Test {
-    int rowsPerPut;
 
     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
-      rowsPerPut = options.getRowsPerPut();
     }
 
     @Override
-    void testTimed() throws IOException {
-      int lastRow = this.startRow + this.perClientRunRows;
-      // Report on completion of 1/10th of total.
-      List<Put> puts = new ArrayList<Put>();
-      for (int i = this.startRow; i < lastRow; i += rowsPerPut) {
-        for (int j = 0; j < rowsPerPut; j++) {
-          Put put = new Put(format(i + j));
-          byte[] value = generateValue(this.rand);
-          put.add(FAMILY_NAME, QUALIFIER_NAME, value);
-          puts.add(put);
-          if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
-            status.setStatus(generateStatus(this.startRow, i, lastRow));
-          }
+    void testRow(final int i) throws IOException {
+      Put put = new Put(format(i));
+      byte[] value = generateData(this.rand, ROW_LENGTH);
+      if (useTags) {
+        byte[] tag = generateData(this.rand, TAG_LENGTH);
+        Tag[] tags = new Tag[noOfTags];
+        for (int n = 0; n < noOfTags; n++) {
+          Tag t = new Tag((byte) n, tag);
+          tags[n] = t;
         }
-        table.put(puts);
+        put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+      } else {
+        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
+      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+      table.put(put);
     }
   }
 
@@ -1027,7 +1208,7 @@ public class PerformanceEvaluation  {
       return scan;
     }
   }
-  
+
   /*
    * Format passed integer.
    * @param number
@@ -1035,7 +1216,7 @@ public class PerformanceEvaluation  {
    * number (Does absolute in case number is negative).
    */
   public static byte [] format(final int number) {
-    byte [] b = new byte[10];
+    byte [] b = new byte[DEFAULT_ROW_PREFIX_LENGTH + 10];
     int d = Math.abs(number);
     for (int i = b.length - 1; i >= 0; i--) {
       b[i] = (byte)((d % 10) + '0');
@@ -1043,34 +1224,51 @@ public class PerformanceEvaluation  {
     }
     return b;
   }
-  
-  /*
-   * This method takes some time and is done inline uploading data.  For
-   * example, doing the mapfile test, generation of the key and value
-   * consumes about 30% of CPU time.
-   * @return Generated random value to insert into a table cell.
-   */
+
+  public static byte[] generateData(final Random r, int length) {
+    byte [] b = new byte [length];
+    int i = 0;
+
+    for(i = 0; i < (length-8); i += 8) {
+      b[i] = (byte) (65 + r.nextInt(26));
+      b[i+1] = b[i];
+      b[i+2] = b[i];
+      b[i+3] = b[i];
+      b[i+4] = b[i];
+      b[i+5] = b[i];
+      b[i+6] = b[i];
+      b[i+7] = b[i];
+    }
+
+    byte a = (byte) (65 + r.nextInt(26));
+    for(; i < length; i++) {
+      b[i] = a;
+    }
+    return b;
+  }
+
   public static byte[] generateValue(final Random r) {
     byte [] b = new byte [ROW_LENGTH];
     r.nextBytes(b);
     return b;
   }
-  
+
   static byte [] getRandomRow(final Random random, final int totalRows) {
     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
   }
-  
+
   long runOneClient(final Class<? extends Test> cmd, final int startRow,
-                    final int perClientRunRows, final int totalRows, 
-                    final int rowsPerPut, final Status status)
+      final int perClientRunRows, final int totalRows,
+      boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
+      HConnection connection, final Status status)
   throws IOException {
     status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
       perClientRunRows + " rows");
     long totalElapsedTime = 0;
 
-    Test t = null;
     TestOptions options = new TestOptions(startRow, perClientRunRows,
-      totalRows, getTableDescriptor().getTableName().getName(), rowsPerPut);
+      totalRows, N, tableName, flushCommits, writeToWAL, useTags, noOfTags, connection);
+    final Test t;
     try {
       Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
           Configuration.class, TestOptions.class, Status.class);
@@ -1089,7 +1287,7 @@ public class PerformanceEvaluation  {
       "ms at offset " + startRow + " for " + perClientRunRows + " rows");
     return totalElapsedTime;
   }
-  
+
   private void runNIsOne(final Class<? extends Test> cmd) {
     Status status = new Status() {
       public void setStatus(String msg) throws IOException {
@@ -1097,12 +1295,16 @@ public class PerformanceEvaluation  {
       }
     };
 
+    RemoteAdmin admin = null;
     try {
-      checkTable();
-      runOneClient(cmd, 0, R, R, B, status);
+      Client client = new Client(cluster);
+      admin = new RemoteAdmin(client, getConf());
+      checkTable(admin);
+      runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL,
+        this.useTags, this.noOfTags, this.connection, status);
     } catch (Exception e) {
       LOG.error("Failed", e);
-    } 
+    }
   }
 
   private void runTest(final Class<? extends Test> cmd) throws IOException,
@@ -1112,7 +1314,7 @@ public class PerformanceEvaluation  {
       // has been set up at all.
       runNIsOne(cmd);
     } else {
-      // Else, run 
+      // Else, run
       runNIsMoreThanOne(cmd);
     }
   }
@@ -1120,20 +1322,35 @@ public class PerformanceEvaluation  {
   protected void printUsage() {
     printUsage(null);
   }
-  
+
   protected void printUsage(final String message) {
     if (message != null && message.length() > 0) {
       System.err.println(message);
     }
     System.err.println("Usage: java " + this.getClass().getName() + " \\");
-    System.err.println("  [--option] [--option=value] <command> <nclients>");
+    System.err.println("  [--nomapred] [--rows=ROWS] [--table=NAME] \\");
+    System.err.println("  [--compress=TYPE] [--blockEncoding=TYPE] [-D<property=value>]* <command> <nclients>");
     System.err.println();
     System.err.println("Options:");
-    System.err.println(" host          String. Specify Stargate endpoint.");
-    System.err.println(" rows          Integer. Rows each client runs. Default: One million");
-    System.err.println(" rowsPerPut    Integer. Rows each Stargate (multi)Put. Default: 100");
-    System.err.println(" nomapred      (Flag) Run multiple clients using threads " +
+    System.err.println(" nomapred        Run multiple clients using threads " +
       "(rather than use mapreduce)");
+    System.err.println(" rows            Rows each client runs. Default: One million");
+    System.err.println(" table           Alternate table name. Default: 'TestTable'");
+    System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
+    System.err.println(" flushCommits    Used to determine if the test should flush the table.  Default: false");
+    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
+    System.err.println(" presplit        Create presplit table. Recommended for accurate perf analysis (see guide).  Default: disabled");
+    System.err
+        .println(" inmemory        Tries to keep the HFiles of the CF inmemory as far as possible.  Not " +
+            "guaranteed that reads are always served from inmemory.  Default: false");
+    System.err.println(" usetags         Writes tags along with KVs.  Use with HFile V3.  Default : false");
+    System.err
+        .println(" numoftags        Specify the no of tags that would be needed.  This works only if usetags is true.");
+    System.err.println();
+    System.err.println(" Note: -D properties will be applied to the conf used. ");
+    System.err.println("  For example: ");
+    System.err.println("   -Dmapred.output.compress=true");
+    System.err.println("   -Dmapreduce.task.timeout=60000");
     System.err.println();
     System.err.println("Command:");
     for (CmdDescriptor command : commands.values()) {
@@ -1161,10 +1378,11 @@ public class PerformanceEvaluation  {
     // Set total number of rows to write.
     R = R * N;
   }
-  
-  public int doCommandLine(final String[] args) {
+
+  @Override
+  public int run(String[] args) throws Exception {
     // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).    
+    // (but hopefully something not as painful as cli options).
     int errCode = -1;
     if (args.length < 1) {
       printUsage();
@@ -1174,27 +1392,77 @@ public class PerformanceEvaluation  {
     try {
       for (int i = 0; i < args.length; i++) {
         String cmd = args[i];
-        if (cmd.equals("-h")) {
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
           printUsage();
           errCode = 0;
           break;
         }
-       
+
         final String nmr = "--nomapred";
         if (cmd.startsWith(nmr)) {
           nomapred = true;
           continue;
         }
-        
+
         final String rows = "--rows=";
         if (cmd.startsWith(rows)) {
           R = Integer.parseInt(cmd.substring(rows.length()));
           continue;
         }
 
-        final String rowsPerPut = "--rowsPerPut=";
-        if (cmd.startsWith(rowsPerPut)) {
-          this.B = Integer.parseInt(cmd.substring(rowsPerPut.length()));
+        final String table = "--table=";
+        if (cmd.startsWith(table)) {
+          this.tableName = TableName.valueOf(cmd.substring(table.length()));
+          continue;
+        }
+
+        final String compress = "--compress=";
+        if (cmd.startsWith(compress)) {
+          this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+          continue;
+        }
+
+        final String blockEncoding = "--blockEncoding=";
+        if (cmd.startsWith(blockEncoding)) {
+          this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+          continue;
+        }
+
+        final String flushCommits = "--flushCommits=";
+        if (cmd.startsWith(flushCommits)) {
+          this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+          continue;
+        }
+
+        final String writeToWAL = "--writeToWAL=";
+        if (cmd.startsWith(writeToWAL)) {
+          this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+          continue;
+        }
+
+        final String presplit = "--presplit=";
+        if (cmd.startsWith(presplit)) {
+          this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+          continue;
+        }
+
+        final String inMemory = "--inmemory=";
+        if (cmd.startsWith(inMemory)) {
+          this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+          continue;
+        }
+
+        this.connection = HConnectionManager.createConnection(getConf());
+
+        final String useTags = "--usetags=";
+        if (cmd.startsWith(useTags)) {
+          this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+          continue;
+        }
+
+        final String noOfTags = "--nooftags=";
+        if (cmd.startsWith(noOfTags)) {
+          this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
           continue;
         }
 
@@ -1219,14 +1487,14 @@ public class PerformanceEvaluation  {
           errCode = 0;
           break;
         }
-    
+
         printUsage();
         break;
       }
     } catch (Exception e) {
-      e.printStackTrace();
+      LOG.error("Failed", e);
     }
-    
+
     return errCode;
   }
 
@@ -1238,8 +1506,8 @@ public class PerformanceEvaluation  {
   /**
    * @param args
    */
-  public static void main(final String[] args) {
-    Configuration c = HBaseConfiguration.create();
-    System.exit(new PerformanceEvaluation(c).doCommandLine(args));
+  public static void main(final String[] args) throws Exception {
+    int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
+    System.exit(res);
   }
 }