You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/02/11 02:47:16 UTC

[2/3] hbase git commit: HBASE-15246 Backport branch-1 HBasePerformanceEvaluation to 0.98

http://git-wip-us.apache.org/repos/asf/hbase/blob/38ce707b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index ffa5150..1c84c30 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -30,7 +30,9 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -39,26 +41,32 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.google.common.base.Objects;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterAllFilter;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -67,12 +75,11 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Hash;
-import org.apache.hadoop.hbase.util.MurmurHash;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.trace.SpanReceiverHost;
+import org.apache.hadoop.hbase.util.*;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -82,39 +89,49 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.cloudera.htrace.Sampler;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
+import org.cloudera.htrace.impl.ProbabilitySampler;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.stats.UniformSample;
+import com.yammer.metrics.stats.Snapshot;
 
 /**
  * Script used evaluating HBase performance and scalability.  Runs a HBase
  * client that steps through one of a set of hardcoded tests or 'experiments'
  * (e.g. a random reads test, a random writes test, etc.). Pass on the
  * command-line which test to run and how many clients are participating in
- * this experiment. Run <code>java PerformanceEvaluation --help</code> to
- * obtain usage.
+ * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
  *
  * <p>This class sets up and runs the evaluation programs described in
  * Section 7, <i>Performance Evaluation</i>, of the <a
  * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
  * paper, pages 8-10.
  *
- * <p>If number of clients > 1, we start up a MapReduce job. Each map task
- * runs an individual client. Each client does about 1GB of data.
+ * <p>By default, runs as a mapreduce job where each mapper runs a single test
+ * client. Can also run as a non-mapreduce, multithreaded application by
+ * specifying {@code --nomapred}. Each client does about 1GB of data, unless
+ * specified otherwise.
  */
 public class PerformanceEvaluation extends Configured implements Tool {
-  protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
+  private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  static {
+    MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
+  }
 
   public static final String TABLE_NAME = "TestTable";
   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
-  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
-  public static final int VALUE_LENGTH = 1000;
+  public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
+  public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
+  public static final int DEFAULT_VALUE_LENGTH = 1000;
   public static final int ROW_LENGTH = 26;
 
   private static final int ONE_GB = 1024 * 1024 * 1000;
-  private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH;
+  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
   // TODO : should we make this configurable
   private static final int TAG_LENGTH = 256;
   private static final DecimalFormat FMT = new DecimalFormat("0.##");
@@ -123,10 +140,45 @@ public class PerformanceEvaluation extends Configured implements Tool {
   private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
   private static final TestOptions DEFAULT_OPTS = new TestOptions();
 
-  protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
-
+  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
 
+  static {
+    addCommandDescriptor(RandomReadTest.class, "randomRead",
+      "Run random read test");
+    addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
+      "Run random seek and scan 100 test");
+    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
+      "Run random seek scan with both start and stop row (max 10 rows)");
+    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
+      "Run random seek scan with both start and stop row (max 100 rows)");
+    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
+      "Run random seek scan with both start and stop row (max 1000 rows)");
+    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
+      "Run random seek scan with both start and stop row (max 10000 rows)");
+    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
+      "Run random write test");
+    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
+      "Run sequential read test");
+    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
+      "Run sequential write test");
+    addCommandDescriptor(ScanTest.class, "scan",
+      "Run scan test (read every row)");
+    addCommandDescriptor(FilteredScanTest.class, "filterScan",
+      "Run scan test using a filter to find a specific row based on it's value " +
+      "(make sure to use --rows=20)");
+    addCommandDescriptor(IncrementTest.class, "increment",
+      "Increment on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(AppendTest.class, "append",
+      "Append on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
+      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
+      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
+      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
+  }
+
   /**
    * Enum for map metrics.  Keep it out here rather than inside in the Map
    * inner-class so we can find associated properties.
@@ -138,42 +190,40 @@ public class PerformanceEvaluation extends Configured implements Tool {
     ROWS
   }
 
+  protected static class RunResult implements Comparable<RunResult> {
+    public RunResult(long duration, Histogram hist) {
+      this.duration = duration;
+      this.hist = hist;
+    }
+
+    public final long duration;
+    public final Histogram hist;
+
+    @Override
+    public String toString() {
+      return Long.toString(duration);
+    }
+
+    @Override public int compareTo(RunResult o) {
+      if (this.duration == o.duration) {
+        return 0;
+      }
+      return this.duration > o.duration ? 1 : -1;
+    }
+  }
+
   /**
    * Constructor
    * @param conf Configuration object
    */
   public PerformanceEvaluation(final Configuration conf) {
     super(conf);
-
-    addCommandDescriptor(RandomReadTest.class, "randomRead",
-        "Run random read test");
-    addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
-        "Run random seek and scan 100 test");
-    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
-        "Run random seek scan with both start and stop row (max 10 rows)");
-    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
-        "Run random seek scan with both start and stop row (max 100 rows)");
-    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
-        "Run random seek scan with both start and stop row (max 1000 rows)");
-    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
-        "Run random seek scan with both start and stop row (max 10000 rows)");
-    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
-        "Run random write test");
-    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
-        "Run sequential read test");
-    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
-        "Run sequential write test");
-    addCommandDescriptor(ScanTest.class, "scan",
-        "Run scan test (read every row)");
-    addCommandDescriptor(FilteredScanTest.class, "filterScan",
-        "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
   }
 
-  protected void addCommandDescriptor(Class<? extends Test> cmdClass,
+  protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
       String name, String description) {
-    CmdDescriptor cmdDescriptor =
-      new CmdDescriptor(cmdClass, name, description);
-    commands.put(name, cmdDescriptor);
+    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
+    COMMANDS.put(name, cmdDescriptor);
   }
 
   /**
@@ -200,7 +250,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
     public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
 
     private Class<? extends Test> cmd;
-    private PerformanceEvaluation pe;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -211,8 +260,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       Class<? extends PerformanceEvaluation> peClass =
           forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
       try {
-        this.pe = peClass.getConstructor(Configuration.class)
-            .newInstance(context.getConfiguration());
+        peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
       } catch (Exception e) {
         throw new IllegalStateException("Could not instantiate PE instance", e);
       }
@@ -226,10 +274,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
       }
     }
 
+    @Override
     protected void map(LongWritable key, Text value, final Context context)
            throws IOException, InterruptedException {
 
       Status status = new Status() {
+        @Override
         public void setStatus(String msg) {
            context.setStatus(msg);
         }
@@ -238,55 +288,80 @@ public class PerformanceEvaluation extends Configured implements Tool {
       ObjectMapper mapper = new ObjectMapper();
       TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
       Configuration conf = HBaseConfiguration.create(context.getConfiguration());
+      final HConnection con = HConnectionManager.createConnection(conf);
 
       // Evaluation task
-      long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status);
+      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status);
       // Collect how much time the thing took. Report as map output and
       // to the ELAPSED_TIME counter.
-      context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
+      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
       context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
-      context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime));
+      context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
       context.progress();
     }
   }
 
   /*
-   * If table does not already exist, create.
-   * @param c Client to use checking.
-   * @return True if we created the table.
-   * @throws IOException
+   * If table does not already exist, create. Also create a table when
+   * {@code opts.presplitRegions} is specified.
    */
-  private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
-    HTableDescriptor tableDescriptor = getTableDescriptor(opts);
-    if (opts.presplitRegions > 0) {
-      // presplit requested
-      if (admin.tableExists(tableDescriptor.getTableName())) {
-        admin.disableTable(tableDescriptor.getTableName());
-        admin.deleteTable(tableDescriptor.getTableName());
-      }
-
-      byte[][] splits = getSplits(opts);
-      for (int i=0; i < splits.length; i++) {
-        LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+  static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
+    TableName tableName = TableName.valueOf(opts.tableName);
+    boolean needsDelete = false, exists = admin.tableExists(tableName);
+    boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
+      || opts.cmdName.toLowerCase().contains("scan");
+    if (!exists && isReadCmd) {
+      throw new IllegalStateException(
+        "Must specify an existing table for read commands. Run a write command first.");
+    }
+    HTableDescriptor desc =
+      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
+    byte[][] splits = getSplits(opts);
+
+    // recreate the table when user has requested presplit or when existing
+    // {RegionSplitPolicy,replica count} does not match requested.
+    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
+      || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)) {
+      needsDelete = true;
+      // wait, why did it delete my table?!?
+      LOG.debug(Objects.toStringHelper("needsDelete")
+        .add("needsDelete", needsDelete)
+        .add("isReadCmd", isReadCmd)
+        .add("exists", exists)
+        .add("desc", desc)
+        .add("presplit", opts.presplitRegions)
+        .add("splitPolicy", opts.splitPolicy));
+    }
+
+    // remove an existing table
+    if (needsDelete) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
       }
-      admin.createTable(tableDescriptor, splits);
-      LOG.info ("Table created with " + opts.presplitRegions + " splits");
+      admin.deleteTable(tableName);
     }
-    else {
-      boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
-      if (!tableExists) {
-        admin.createTable(tableDescriptor);
-        LOG.info("Table " + tableDescriptor + " created");
+
+    // table creation is necessary
+    if (!exists || needsDelete) {
+      desc = getTableDescriptor(opts);
+      if (splits != null) {
+        if (LOG.isDebugEnabled()) {
+          for (int i = 0; i < splits.length; i++) {
+            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+          }
+        }
       }
+      admin.createTable(desc, splits);
+      LOG.info("Table " + desc + " created");
     }
-    return admin.tableExists(tableDescriptor.getTableName());
+    return admin.tableExists(tableName);
   }
 
   /**
    * Create an HTableDescriptor from provided TestOptions.
    */
   protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
-    HTableDescriptor desc = new HTableDescriptor(opts.tableName);
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(opts.tableName));
     HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
     family.setDataBlockEncoding(opts.blockEncoding);
     family.setCompressionType(opts.compression);
@@ -295,6 +370,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
       family.setInMemory(true);
     }
     desc.addFamily(family);
+    if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
+      desc.setRegionSplitPolicyClassName(opts.splitPolicy);
+    }
     return desc;
   }
 
@@ -302,8 +380,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * generates splits based on total number of rows and specified split regions
    */
   protected static byte[][] getSplits(TestOptions opts) {
-    if (opts.presplitRegions == 0)
-      return new byte [0][];
+    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
+      return null;
 
     int numSplitPoints = opts.presplitRegions - 1;
     byte[][] splits = new byte[numSplitPoints][];
@@ -317,53 +395,61 @@ public class PerformanceEvaluation extends Configured implements Tool {
 
   /*
    * Run all clients in this vm each to its own thread.
-   * @param cmd Command to run.
-   * @throws IOException
    */
-  private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
+  static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
       throws IOException, InterruptedException {
-    Future<Long>[] threads = new Future[opts.numClientThreads];
-    long[] timings = new long[opts.numClientThreads];
+    final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+    assert cmd != null;
+    @SuppressWarnings("unchecked")
+    Future<RunResult>[] threads = new Future[opts.numClientThreads];
+    RunResult[] results = new RunResult[opts.numClientThreads];
     ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
       new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
+    final HConnection con = HConnectionManager.createConnection(conf);
     for (int i = 0; i < threads.length; i++) {
       final int index = i;
-      threads[i] = pool.submit(new Callable<Long>() {
+      threads[i] = pool.submit(new Callable<RunResult>() {
         @Override
-        public Long call() throws Exception {
+        public RunResult call() throws Exception {
           TestOptions threadOpts = new TestOptions(opts);
-          threadOpts.startRow = index * threadOpts.perClientRunRows;
-          long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
+          if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
+          RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() {
+            @Override
             public void setStatus(final String msg) throws IOException {
-              LOG.info("client-" + Thread.currentThread().getName() + " " + msg);
+              LOG.info(msg);
             }
           });
-          LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime +
+          LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
             "ms over " + threadOpts.perClientRunRows + " rows");
-          return elapsedTime;
+          return run;
         }
       });
     }
     pool.shutdown();
+
     for (int i = 0; i < threads.length; i++) {
       try {
-        timings[i] = threads[i].get();
+        results[i] = threads[i].get();
       } catch (ExecutionException e) {
         throw new IOException(e.getCause());
       }
     }
     final String test = cmd.getSimpleName();
     LOG.info("[" + test + "] Summary of timings (ms): "
-             + Arrays.toString(timings));
-    Arrays.sort(timings);
+             + Arrays.toString(results));
+    Arrays.sort(results);
     long total = 0;
-    for (int i = 0; i < timings.length; i++) {
-      total += timings[i];
+    for (RunResult result : results) {
+      total += result.duration;
     }
     LOG.info("[" + test + "]"
-             + "\tMin: " + timings[0] + "ms"
-             + "\tMax: " + timings[timings.length - 1] + "ms"
-             + "\tAvg: " + (total / timings.length) + "ms");
+      + "\tMin: " + results[0] + "ms"
+      + "\tMax: " + results[results.length - 1] + "ms"
+      + "\tAvg: " + (total / results.length) + "ms");
+
+    con.close();
+
+    return results;
   }
 
   /*
@@ -373,15 +459,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
-        InterruptedException, ClassNotFoundException {
-    Configuration conf = getConf();
+  static Job doMapReduce(TestOptions opts, final Configuration conf)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+    assert cmd != null;
     Path inputDir = writeInputFile(conf, opts);
     conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
-    conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
-    Job job = new Job(conf);
+    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
+    Job job = Job.getInstance(conf);
     job.setJarByClass(PerformanceEvaluation.class);
-    job.setJobName("HBase Performance Evaluation");
+    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
 
     job.setInputFormatClass(NLineInputFormat.class);
     NLineInputFormat.setInputPaths(job, inputDir);
@@ -401,12 +488,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
 
     TableMapReduceUtil.addDependencyJars(job);
     TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-      DescriptiveStatistics.class, // commons-math
-      ObjectMapper.class);         // jackson-mapper-asl
+      Histogram.class,     // yammer metrics
+      ObjectMapper.class); // jackson-mapper-asl
 
     TableMapReduceUtil.initCredentials(job);
 
     job.waitForCompletion(true);
+    return job;
   }
 
   /*
@@ -415,7 +503,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @return Directory that contains file written.
    * @throws IOException
    */
-  private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
+  private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
     Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
     Path inputDir = new Path(jobdir, "inputs");
@@ -429,15 +517,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
     Map<Integer, String> m = new TreeMap<Integer, String>();
     Hash h = MurmurHash.getInstance();
     int perClientRows = (opts.totalRows / opts.numClientThreads);
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
     try {
       for (int i = 0; i < 10; i++) {
         for (int j = 0; j < opts.numClientThreads; j++) {
           TestOptions next = new TestOptions(opts);
           next.startRow = (j * perClientRows) + (i * (perClientRows/10));
           next.perClientRunRows = perClientRows / 10;
-          String s = mapper.writeValueAsString(next);
+          String s = MAPPER.writeValueAsString(next);
+          LOG.info("maptask input=" + s);
           int hash = h.hash(Bytes.toBytes(s));
           m.put(hash, s);
         }
@@ -481,54 +568,350 @@ public class PerformanceEvaluation extends Configured implements Tool {
   /**
    * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
    * This makes tracking all these arguments a little easier.
+   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
+   * serialization of this TestOptions class behave), and you need to add to the clone constructor
+   * below copying your new option from the 'that' to the 'this'.  Look for 'clone' below.
    */
   static class TestOptions {
+    String cmdName = null;
+    boolean nomapred = false;
+    boolean filterAll = false;
+    int startRow = 0;
+    float size = 1.0f;
+    int perClientRunRows = DEFAULT_ROWS_PER_GB;
+    int numClientThreads = 1;
+    int totalRows = DEFAULT_ROWS_PER_GB;
+    float sampleRate = 1.0f;
+    double traceRate = 0.0;
+    String tableName = TABLE_NAME;
+    boolean flushCommits = true;
+    boolean writeToWAL = true;
+    boolean autoFlush = false;
+    boolean oneCon = false;
+    boolean useTags = false;
+    int noOfTags = 1;
+    boolean reportLatency = false;
+    int multiGet = 0;
+    int randomSleep = 0;
+    boolean inMemoryCF = false;
+    int presplitRegions = 0;
+    String splitPolicy = null;
+    Compression.Algorithm compression = Compression.Algorithm.NONE;
+    BloomType bloomType = BloomType.ROW;
+    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
+    boolean valueRandom = false;
+    boolean valueZipf = false;
+    int valueSize = DEFAULT_VALUE_LENGTH;
+    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
+    int columns = 1;
+    int caching = 30;
+    boolean addColumns = true;
 
     public TestOptions() {}
 
+    /**
+     * Clone constructor.
+     * @param that Object to copy from.
+     */
     public TestOptions(TestOptions that) {
+      this.cmdName = that.cmdName;
       this.nomapred = that.nomapred;
       this.startRow = that.startRow;
+      this.size = that.size;
       this.perClientRunRows = that.perClientRunRows;
       this.numClientThreads = that.numClientThreads;
       this.totalRows = that.totalRows;
       this.sampleRate = that.sampleRate;
+      this.traceRate = that.traceRate;
       this.tableName = that.tableName;
       this.flushCommits = that.flushCommits;
       this.writeToWAL = that.writeToWAL;
+      this.autoFlush = that.autoFlush;
+      this.oneCon = that.oneCon;
       this.useTags = that.useTags;
       this.noOfTags = that.noOfTags;
       this.reportLatency = that.reportLatency;
       this.multiGet = that.multiGet;
       this.inMemoryCF = that.inMemoryCF;
       this.presplitRegions = that.presplitRegions;
+      this.splitPolicy = that.splitPolicy;
       this.compression = that.compression;
       this.blockEncoding = that.blockEncoding;
       this.filterAll = that.filterAll;
       this.bloomType = that.bloomType;
+      this.valueRandom = that.valueRandom;
+      this.valueZipf = that.valueZipf;
+      this.valueSize = that.valueSize;
+      this.period = that.period;
+      this.randomSleep = that.randomSleep;
       this.addColumns = that.addColumns;
+      this.columns = that.columns;
+      this.caching = that.caching;
     }
 
-    public boolean nomapred = false;
-    public boolean filterAll = false;
-    public int startRow = 0;
-    public int perClientRunRows = ROWS_PER_GB;
-    public int numClientThreads = 1;
-    public int totalRows = ROWS_PER_GB;
-    public float sampleRate = 1.0f;
-    public String tableName = TABLE_NAME;
-    public boolean flushCommits = true;
-    public boolean writeToWAL = true;
-    public boolean useTags = false;
-    public int noOfTags = 1;
-    public boolean reportLatency = false;
-    public int multiGet = 0;
-    boolean inMemoryCF = false;
-    int presplitRegions = 0;
-    public Compression.Algorithm compression = Compression.Algorithm.NONE;
-    public BloomType bloomType = BloomType.ROW;
-    public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
-    boolean addColumns = true;
+    public int getCaching() {
+      return this.caching;
+    }
+
+    public void setCaching(final int caching) {
+      this.caching = caching;
+    }
+
+    public int getColumns() {
+      return this.columns;
+    }
+
+    public void setColumns(final int columns) {
+      this.columns = columns;
+    }
+
+    public boolean isValueZipf() {
+      return valueZipf;
+    }
+
+    public void setValueZipf(boolean valueZipf) {
+      this.valueZipf = valueZipf;
+    }
+
+    public String getCmdName() {
+      return cmdName;
+    }
+
+    public void setCmdName(String cmdName) {
+      this.cmdName = cmdName;
+    }
+
+    public int getRandomSleep() {
+      return randomSleep;
+    }
+
+    public void setRandomSleep(int randomSleep) {
+      this.randomSleep = randomSleep;
+    }
+
+    public String getSplitPolicy() {
+      return splitPolicy;
+    }
+
+    public void setSplitPolicy(String splitPolicy) {
+      this.splitPolicy = splitPolicy;
+    }
+
+    public void setNomapred(boolean nomapred) {
+      this.nomapred = nomapred;
+    }
+
+    public void setFilterAll(boolean filterAll) {
+      this.filterAll = filterAll;
+    }
+
+    public void setStartRow(int startRow) {
+      this.startRow = startRow;
+    }
+
+    public void setSize(float size) {
+      this.size = size;
+    }
+
+    public void setPerClientRunRows(int perClientRunRows) {
+      this.perClientRunRows = perClientRunRows;
+    }
+
+    public void setNumClientThreads(int numClientThreads) {
+      this.numClientThreads = numClientThreads;
+    }
+
+    public void setTotalRows(int totalRows) {
+      this.totalRows = totalRows;
+    }
+
+    public void setSampleRate(float sampleRate) {
+      this.sampleRate = sampleRate;
+    }
+
+    public void setTraceRate(double traceRate) {
+      this.traceRate = traceRate;
+    }
+
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+
+    public void setFlushCommits(boolean flushCommits) {
+      this.flushCommits = flushCommits;
+    }
+
+    public void setWriteToWAL(boolean writeToWAL) {
+      this.writeToWAL = writeToWAL;
+    }
+
+    public void setAutoFlush(boolean autoFlush) {
+      this.autoFlush = autoFlush;
+    }
+
+    public void setOneCon(boolean oneCon) {
+      this.oneCon = oneCon;
+    }
+
+    public void setUseTags(boolean useTags) {
+      this.useTags = useTags;
+    }
+
+    public void setNoOfTags(int noOfTags) {
+      this.noOfTags = noOfTags;
+    }
+
+    public void setReportLatency(boolean reportLatency) {
+      this.reportLatency = reportLatency;
+    }
+
+    public void setMultiGet(int multiGet) {
+      this.multiGet = multiGet;
+    }
+
+    public void setInMemoryCF(boolean inMemoryCF) {
+      this.inMemoryCF = inMemoryCF;
+    }
+
+    public void setPresplitRegions(int presplitRegions) {
+      this.presplitRegions = presplitRegions;
+    }
+
+    public void setCompression(Compression.Algorithm compression) {
+      this.compression = compression;
+    }
+
+    public void setBloomType(BloomType bloomType) {
+      this.bloomType = bloomType;
+    }
+
+    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
+      this.blockEncoding = blockEncoding;
+    }
+
+    public void setValueRandom(boolean valueRandom) {
+      this.valueRandom = valueRandom;
+    }
+
+    public void setValueSize(int valueSize) {
+      this.valueSize = valueSize;
+    }
+
+    public void setPeriod(int period) {
+      this.period = period;
+    }
+
+    public boolean isNomapred() {
+      return nomapred;
+    }
+
+    public boolean isFilterAll() {
+      return filterAll;
+    }
+
+    public int getStartRow() {
+      return startRow;
+    }
+
+    public float getSize() {
+      return size;
+    }
+
+    public int getPerClientRunRows() {
+      return perClientRunRows;
+    }
+
+    public int getNumClientThreads() {
+      return numClientThreads;
+    }
+
+    public int getTotalRows() {
+      return totalRows;
+    }
+
+    public float getSampleRate() {
+      return sampleRate;
+    }
+
+    public double getTraceRate() {
+      return traceRate;
+    }
+
+    public String getTableName() {
+      return tableName;
+    }
+
+    public boolean isFlushCommits() {
+      return flushCommits;
+    }
+
+    public boolean isWriteToWAL() {
+      return writeToWAL;
+    }
+
+    public boolean isAutoFlush() {
+      return autoFlush;
+    }
+
+    public boolean isUseTags() {
+      return useTags;
+    }
+
+    public int getNoOfTags() {
+      return noOfTags;
+    }
+
+    public boolean isReportLatency() {
+      return reportLatency;
+    }
+
+    public int getMultiGet() {
+      return multiGet;
+    }
+
+    public boolean isInMemoryCF() {
+      return inMemoryCF;
+    }
+
+    public int getPresplitRegions() {
+      return presplitRegions;
+    }
+
+    public Compression.Algorithm getCompression() {
+      return compression;
+    }
+
+    public DataBlockEncoding getBlockEncoding() {
+      return blockEncoding;
+    }
+
+    public boolean isValueRandom() {
+      return valueRandom;
+    }
+
+    public int getValueSize() {
+      return valueSize;
+    }
+
+    public int getPeriod() {
+      return period;
+    }
+
+    public BloomType getBloomType() {
+      return bloomType;
+    }
+
+    public boolean isOneCon() {
+      return oneCon;
+    }
+
+    public boolean getAddColumns() {
+      return addColumns;
+    }
+
+    public void setAddColumns(boolean addColumns) {
+      this.addColumns = addColumns;
+    }
   }
 
   /*
@@ -539,56 +922,125 @@ public class PerformanceEvaluation extends Configured implements Tool {
     // Below is make it so when Tests are all running in the one
     // jvm, that they each have a differently seeded Random.
     private static final Random randomSeed = new Random(System.currentTimeMillis());
+
     private static long nextRandomSeed() {
       return randomSeed.nextLong();
     }
+    private final int everyN;
+
     protected final Random rand = new Random(nextRandomSeed());
     protected final Configuration conf;
     protected final TestOptions opts;
 
     private final Status status;
+    private final Sampler<?> traceSampler;
+    private final SpanReceiverHost receiverHost;
     protected HConnection connection;
-    protected HTableInterface table;
+
+    private String testName;
+    private Histogram latency;
+    private Histogram valueSize;
+    private RandomDistribution.Zipf zipf;
 
     /**
-     * Note that all subclasses of this class must provide a public contructor
+     * Note that all subclasses of this class must provide a public constructor
      * that has the exact same list of arguments.
      */
-    Test(final Configuration conf, final TestOptions options, final Status status) {
-      this.conf = conf;
+    Test(final HConnection con, final TestOptions options, final Status status) {
+      this.connection = con;
+      this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration();
       this.opts = options;
       this.status = status;
+      this.testName = this.getClass().getSimpleName();
+      receiverHost = SpanReceiverHost.getInstance(conf);
+      if (options.traceRate >= 1.0) {
+        this.traceSampler = Sampler.ALWAYS;
+      } else if (options.traceRate > 0.0) {
+        this.traceSampler = new ProbabilitySampler(options.traceRate);
+      } else {
+        this.traceSampler = Sampler.NEVER;
+      }
+      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
+      if (options.isValueZipf()) {
+        this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.1);
+      }
+      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
     }
 
-    private String generateStatus(final int sr, final int i, final int lr) {
-      return sr + "/" + i + "/" + lr;
+    int getValueLength(final Random r) {
+      if (this.opts.isValueRandom()) return Math.abs(r.nextInt() % opts.valueSize);
+      else if (this.opts.isValueZipf()) return Math.abs(this.zipf.nextInt());
+      else return opts.valueSize;
+    }
+
+    void updateValueSize(final Result [] rs) throws IOException {
+      if (rs == null || !isRandomValueSize()) return;
+      for (Result r: rs) updateValueSize(r);
+    }
+
+    void updateValueSize(final Result r) throws IOException {
+      if (r == null || !isRandomValueSize()) return;
+      int size = 0;
+      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
+        size += scanner.current().getValueLength();
+      }
+      updateValueSize(size);
+    }
+
+    void updateValueSize(final int valueSize) {
+      if (!isRandomValueSize()) return;
+      this.valueSize.update(valueSize);
+    }
+
+    String generateStatus(final int sr, final int i, final int lr) {
+      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
+        (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
+    }
+
+    boolean isRandomValueSize() {
+      return opts.valueRandom;
     }
 
     protected int getReportingPeriod() {
-      int period = opts.perClientRunRows / 10;
-      return period == 0 ? opts.perClientRunRows : period;
+      return opts.period;
+    }
+
+    /**
+     * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
+     */
+    public Histogram getLatency() {
+      return latency;
     }
 
     void testSetup() throws IOException {
-      this.connection = HConnectionManager.createConnection(conf);
-      this.table = connection.getTable(opts.tableName);
-      this.table.setAutoFlush(false, true);
+      if (!opts.oneCon) {
+        this.connection = HConnectionManager.createConnection(conf);
+      }
+      onStartup();
+      latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
+      valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
     }
 
+    abstract void onStartup() throws IOException;
+
     void testTakedown() throws IOException {
-      if (opts.flushCommits) {
-        this.table.flushCommits();
+      reportLatency();
+      reportValueSize();
+      onTakedown();
+      if (!opts.oneCon) {
+        connection.close();
       }
-      table.close();
-      connection.close();
+      receiverHost.closeReceivers();
     }
 
+    abstract void onTakedown() throws IOException;
+
     /*
      * Run test
      * @return Elapsed time.
      * @throws IOException
      */
-    long test() throws IOException {
+    long test() throws IOException, InterruptedException {
       testSetup();
       LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
       final long startTime = System.nanoTime();
@@ -600,40 +1052,121 @@ public class PerformanceEvaluation extends Configured implements Tool {
       return (System.nanoTime() - startTime) / 1000000;
     }
 
+    int getStartRow() {
+      return opts.startRow;
+    }
+
+    int getLastRow() {
+      return getStartRow() + opts.perClientRunRows;
+    }
+
     /**
      * Provides an extension point for tests that don't want a per row invocation.
      */
-    void testTimed() throws IOException {
-      int lastRow = opts.startRow + opts.perClientRunRows;
+    void testTimed() throws IOException, InterruptedException {
+      int startRow = getStartRow();
+      int lastRow = getLastRow();
       // Report on completion of 1/10th of total.
-      for (int i = opts.startRow; i < lastRow; i++) {
-        testRow(i);
+      for (int i = startRow; i < lastRow; i++) {
+        if (i % everyN != 0) continue;
+        long startTime = System.nanoTime();
+        TraceScope scope = Trace.startSpan("test row", traceSampler);
+        try {
+          testRow(i);
+        } finally {
+          scope.close();
+        }
+        latency.update((System.nanoTime() - startTime) / 1000);
         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
-          status.setStatus(generateStatus(opts.startRow, i, lastRow));
+          status.setStatus(generateStatus(startRow, i, lastRow));
         }
       }
     }
 
+    /**
+     * report percentiles of latency
+     * @throws IOException
+     */
+    private void reportLatency() throws IOException {
+      status.setStatus(testName + " latency log (microseconds), on " +
+          latency.count() + " measures");
+      reportHistogram(this.latency);
+    }
+
+    private void reportValueSize() throws IOException {
+      status.setStatus(testName + " valueSize after " +
+          valueSize.count() + " measures");
+      reportHistogram(this.valueSize);
+    }
+
+    private void reportHistogram(final Histogram h) throws IOException {
+      Snapshot sn = h.getSnapshot();
+      status.setStatus(testName + " Min      = " + h.min());
+      status.setStatus(testName + " Avg      = " + h.mean());
+      status.setStatus(testName + " StdDev   = " + h.stdDev());
+      status.setStatus(testName + " 50th     = " + sn.getMedian());
+      status.setStatus(testName + " 75th     = " + sn.get75thPercentile());
+      status.setStatus(testName + " 95th     = " + sn.get95thPercentile());
+      status.setStatus(testName + " 99th     = " + sn.get99thPercentile());
+      status.setStatus(testName + " 99.9th   = " + sn.get999thPercentile());
+      status.setStatus(testName + " 99.99th  = " + sn.getValue(0.9999));
+      status.setStatus(testName + " 99.999th = " + sn.getValue(0.99999));
+      status.setStatus(testName + " Max      = " + h.max());
+    }
+
+    /**
+     * @return Subset of the histograms' calculation.
+     */
+    public String getShortLatencyReport() {
+      return YammerHistogramUtils.getShortHistogramReport(this.latency);
+    }
+
+    /**
+     * @return Subset of the histograms' calculation.
+     */
+    public String getShortValueSizeReport() {
+      return YammerHistogramUtils.getShortHistogramReport(this.valueSize);
+    }
+
     /*
     * Test for individual row.
     * @param i Row index.
     */
-    abstract void testRow(final int i) throws IOException;
+    abstract void testRow(final int i) throws IOException, InterruptedException;
   }
 
+  static abstract class TableTest extends Test {
+    protected HTableInterface table;
 
-  @SuppressWarnings("unused")
-  static class RandomSeekScanTest extends Test {
-    RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+    TableTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void onStartup() throws IOException {
+      this.table = connection.getTable(TableName.valueOf(opts.tableName));
+    }
+
+    @Override
+    void onTakedown() throws IOException {
+      table.close();
+    }
+  }
+
+  static class RandomSeekScanTest extends TableTest {
+    RandomSeekScanTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
     void testRow(final int i) throws IOException {
       Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
+      scan.setCaching(opts.caching);
       FilterList list = new FilterList();
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       if (opts.filterAll) {
         list.addFilter(new FilterAllFilter());
@@ -641,7 +1174,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
       list.addFilter(new WhileMatchFilter(new PageFilter(120)));
       scan.setFilter(list);
       ResultScanner s = this.table.getScanner(scan);
-      for (Result rr; (rr = s.next()) != null;) ;
+      for (Result rr; (rr = s.next()) != null;) {
+        updateValueSize(rr);
+      }
       s.close();
     }
 
@@ -653,28 +1188,31 @@ public class PerformanceEvaluation extends Configured implements Tool {
 
   }
 
-  @SuppressWarnings("unused")
-  static abstract class RandomScanWithRangeTest extends Test {
-    RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+  static abstract class RandomScanWithRangeTest extends TableTest {
+    RandomScanWithRangeTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
     void testRow(final int i) throws IOException {
       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
+      scan.setCaching(opts.caching);
       if (opts.filterAll) {
         scan.setFilter(new FilterAllFilter());
       }
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
-      ResultScanner s = this.table.getScanner(scan);
+      Result r = null;
       int count = 0;
-      for (Result rr; (rr = s.next()) != null;) {
+      ResultScanner s = this.table.getScanner(scan);
+      for (; (r = s.next()) != null;) {
+        updateValueSize(r);
         count++;
       }
-
       if (i % 100 == 0) {
         LOG.info(String.format("Scan for key range %s - %s returned %s rows",
             Bytes.toString(startAndStopRow.getFirst()),
@@ -700,8 +1238,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
-    RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+    RandomScanWithRange10Test(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
@@ -711,8 +1249,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
-    RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+    RandomScanWithRange100Test(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
@@ -722,8 +1260,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
-    RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+    RandomScanWithRange1000Test(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
@@ -733,8 +1271,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
-    RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+    RandomScanWithRange10000Test(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
@@ -743,60 +1281,48 @@ public class PerformanceEvaluation extends Configured implements Tool {
     }
   }
 
-  static class RandomReadTest extends Test {
-    private final int everyN;
-    private final double[] times;
+  static class RandomReadTest extends TableTest {
     private ArrayList<Get> gets;
-    int idx = 0;
+    private Random rd = new Random();
 
-    RandomReadTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
-      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
-      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
+    RandomReadTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
       if (opts.multiGet > 0) {
         LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
         this.gets = new ArrayList<Get>(opts.multiGet);
       }
-      if (opts.reportLatency) {
-        this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))];
-      } else {
-        this.times = null;
-      }
     }
 
     @Override
-    void testRow(final int i) throws IOException {
-      if (i % everyN == 0) {
-        Get get = new Get(getRandomRow(this.rand, opts.totalRows));
-        if (opts.addColumns) {
-          get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
-        }
-        if (opts.filterAll) {
-          get.setFilter(new FilterAllFilter());
-        }
-        if (opts.multiGet > 0) {
-          this.gets.add(get);
-          if (this.gets.size() == opts.multiGet) {
-            long start = System.nanoTime();
-            this.table.get(this.gets);
-            if (opts.reportLatency) {
-              times[idx++] = (System.nanoTime() - start) / 1e6;
-            }
-            this.gets.clear();
-          }
-        } else {
-          long start = System.nanoTime();
-          this.table.get(get);
-          if (opts.reportLatency) {
-            times[idx++] = (System.nanoTime() - start) / 1e6;
-          }
+    void testRow(final int i) throws IOException, InterruptedException {
+      if (opts.randomSleep > 0) {
+        Thread.sleep(rd.nextInt(opts.randomSleep));
+      }
+      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
+      if (opts.addColumns) {
+        get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        get.addFamily(FAMILY_NAME);
+      }
+      if (opts.filterAll) {
+        get.setFilter(new FilterAllFilter());
+      }
+      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
+      if (opts.multiGet > 0) {
+        this.gets.add(get);
+        if (this.gets.size() == opts.multiGet) {
+          Result [] rs = this.table.get(this.gets);
+          updateValueSize(rs);
+          this.gets.clear();
         }
+      } else {
+        updateValueSize(this.table.get(get));
       }
     }
 
     @Override
     protected int getReportingPeriod() {
-      int period = opts.perClientRunRows / 100;
+      int period = opts.perClientRunRows / 10;
       return period == 0 ? opts.perClientRunRows : period;
     }
 
@@ -807,61 +1333,47 @@ public class PerformanceEvaluation extends Configured implements Tool {
         this.gets.clear();
       }
       super.testTakedown();
-      if (opts.reportLatency) {
-        Arrays.sort(times);
-        DescriptiveStatistics ds = new DescriptiveStatistics();
-        for (double t : times) {
-          ds.addValue(t);
-        }
-        LOG.info("randomRead latency log (ms), on " + times.length + " measures");
-        LOG.info("99.9999% = " + ds.getPercentile(99.9999d));
-        LOG.info(" 99.999% = " + ds.getPercentile(99.999d));
-        LOG.info("  99.99% = " + ds.getPercentile(99.99d));
-        LOG.info("   99.9% = " + ds.getPercentile(99.9d));
-        LOG.info("     99% = " + ds.getPercentile(99d));
-        LOG.info("     95% = " + ds.getPercentile(95d));
-        LOG.info("     90% = " + ds.getPercentile(90d));
-        LOG.info("     80% = " + ds.getPercentile(80d));
-        LOG.info("Standard Deviation = " + ds.getStandardDeviation());
-        LOG.info("Mean = " + ds.getMean());
-      }
     }
   }
 
-  static class RandomWriteTest extends Test {
-    RandomWriteTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+  static class RandomWriteTest extends TableTest {
+    RandomWriteTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
     void testRow(final int i) throws IOException {
       byte[] row = getRandomRow(this.rand, opts.totalRows);
       Put put = new Put(row);
-      byte[] value = generateData(this.rand, VALUE_LENGTH);
-      if (opts.useTags) {
-        byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[opts.noOfTags];
-        for (int n = 0; n < opts.noOfTags; n++) {
-          Tag t = new Tag((byte) n, tag);
-          tags[n] = t;
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new Tag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.add(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
         }
-        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
-            value, tags);
-        put.add(kv);
-      } else {
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       table.put(put);
     }
   }
 
-
-  static class ScanTest extends Test {
+  static class ScanTest extends TableTest {
     private ResultScanner testScanner;
 
-    ScanTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+    ScanTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
@@ -877,23 +1389,135 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       if (this.testScanner == null) {
         Scan scan = new Scan(format(opts.startRow));
-        scan.setCaching(30);
+        scan.setCaching(opts.caching);
         if (opts.addColumns) {
           scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+        } else {
+          scan.addFamily(FAMILY_NAME);
         }
         if (opts.filterAll) {
           scan.setFilter(new FilterAllFilter());
         }
        this.testScanner = table.getScanner(scan);
       }
-      testScanner.next();
+      Result r = testScanner.next();
+      updateValueSize(r);
+    }
+  }
+
+  /**
+   * Base class for operations that are CAS-like; that read a value and then set it based off what
+   * they read. In this category is increment, append, checkAndPut, etc.
+   *
+   * <p>These operations also want some concurrency going on. Usually when these tests run, they
+   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
+   * same key space. We do this with our getStartRow and getLastRow overrides.
+   */
+  static abstract class CASTableTest extends TableTest {
+    private final byte [] qualifier;
+    CASTableTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
+    }
+
+    byte [] getQualifier() {
+      return this.qualifier;
     }
 
+    @Override
+    int getStartRow() {
+      return 0;
+    }
+
+    @Override
+    int getLastRow() {
+      return opts.perClientRunRows;
+    }
+  }
+
+  static class IncrementTest extends CASTableTest {
+    IncrementTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      Increment increment = new Increment(format(i));
+      increment.addColumn(FAMILY_NAME, getQualifier(), 1l);
+      updateValueSize(this.table.increment(increment));
+    }
   }
 
-  static class SequentialReadTest extends Test {
-    SequentialReadTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+  static class AppendTest extends CASTableTest {
+    AppendTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      Append append = new Append(bytes);
+      append.add(FAMILY_NAME, getQualifier(), bytes);
+      updateValueSize(this.table.append(append));
+    }
+  }
+
+  static class CheckAndMutateTest extends CASTableTest {
+    CheckAndMutateTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.add(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      RowMutations mutations = new RowMutations(bytes);
+      mutations.add(put);
+      this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes,
+          mutations);
+    }
+  }
+
+  static class CheckAndPutTest extends CASTableTest {
+    CheckAndPutTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.add(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), bytes, put);
+    }
+  }
+
+  static class CheckAndDeleteTest extends CASTableTest {
+    CheckAndDeleteTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.add(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      Delete delete = new Delete(put.getRow());
+      delete.deleteColumn(FAMILY_NAME, getQualifier());
+      this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), bytes, delete);
+    }
+  }
+
+  static class SequentialReadTest extends TableTest {
+    SequentialReadTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
@@ -905,53 +1529,59 @@ public class PerformanceEvaluation extends Configured implements Tool {
       if (opts.filterAll) {
         get.setFilter(new FilterAllFilter());
       }
-      table.get(get);
+      updateValueSize(table.get(get));
     }
   }
 
-  static class SequentialWriteTest extends Test {
-    SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+  static class SequentialWriteTest extends TableTest {
+    SequentialWriteTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
     void testRow(final int i) throws IOException {
       byte[] row = format(i);
       Put put = new Put(row);
-      byte[] value = generateData(this.rand, VALUE_LENGTH);
-      if (opts.useTags) {
-        byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[opts.noOfTags];
-        for (int n = 0; n < opts.noOfTags; n++) {
-          Tag t = new Tag((byte) n, tag);
-          tags[n] = t;
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new Tag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.add(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
         }
-        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
-            value, tags);
-        put.add(kv);
-      } else {
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       table.put(put);
     }
   }
 
-  static class FilteredScanTest extends Test {
+  static class FilteredScanTest extends TableTest {
     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
 
-    FilteredScanTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
+    FilteredScanTest(HConnection con, TestOptions options, Status status) {
+      super(con, options, status);
     }
 
     @Override
     void testRow(int i) throws IOException {
-      byte[] value = generateData(this.rand, VALUE_LENGTH);
+      byte[] value = generateData(this.rand, getValueLength(this.rand));
       Scan scan = constructScan(value);
       ResultScanner scanner = null;
       try {
         scanner = this.table.getScanner(scan);
-        while (scanner.next() != null) {
+        for (Result r = null; (r = scanner.next()) != null;) {
+          updateValueSize(r);
         }
       } finally {
         if (scanner != null) scanner.close();
@@ -961,7 +1591,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     protected Scan constructScan(byte[] valuePrefix) throws IOException {
       FilterList list = new FilterList();
       Filter filter = new SingleColumnValueFilter(
-          FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
+          FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
           new BinaryComparator(valuePrefix)
       );
       list.addFilter(filter);
@@ -969,8 +1599,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
         list.addFilter(new FilterAllFilter());
       }
       Scan scan = new Scan();
+      scan.setCaching(opts.caching);
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       scan.setFilter(list);
       return scan;
@@ -983,11 +1616,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @param timeMs Time taken in milliseconds.
    * @return String value with label, ie '123.76 MB/s'
    */
-  private static String calculateMbps(int rows, long timeMs) {
-    // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
-    //        * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
-    BigDecimal rowSize =
-      BigDecimal.valueOf(ROW_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length);
+  private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
+    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
+      ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
     BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
       .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
       .divide(BYTES_PER_MB, CXT);
@@ -1018,7 +1649,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
    */
   public static byte[] generateData(final Random r, int length) {
     byte [] b = new byte [length];
-    int i = 0;
+    int i;
 
     for(i = 0; i < (length-8); i += 8) {
       b[i] = (byte) (65 + r.nextInt(26));
@@ -1044,25 +1675,25 @@ public class PerformanceEvaluation extends Configured implements Tool {
    */
   @Deprecated
   public static byte[] generateValue(final Random r) {
-    return generateData(r, VALUE_LENGTH);
+    return generateData(r, DEFAULT_VALUE_LENGTH);
   }
 
   static byte [] getRandomRow(final Random random, final int totalRows) {
     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
   }
 
-  static long runOneClient(final Class<? extends Test> cmd, Configuration conf, TestOptions opts,
-    final Status status)
-      throws IOException {
+  static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, HConnection con,
+                           TestOptions opts, final Status status)
+      throws IOException, InterruptedException {
     status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
       opts.perClientRunRows + " rows");
-    long totalElapsedTime = 0;
+    long totalElapsedTime;
 
     final Test t;
     try {
       Constructor<? extends Test> constructor =
-        cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
-      t = constructor.newInstance(conf, opts, status);
+        cmd.getDeclaredConstructor(HConnection.class, TestOptions.class, Status.class);
+      t = constructor.newInstance(con, opts, status);
     } catch (NoSuchMethodException e) {
       throw new IllegalArgumentException("Invalid command class: " +
           cmd.getName() + ".  It does not provide a constructor as described by " +
@@ -1075,50 +1706,74 @@ public class PerformanceEvaluation extends Configured implements Tool {
 
     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
       "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
-      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")");
-    return totalElapsedTime;
+      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
+          getAverageValueLength(opts), opts.columns) + ")");
+
+    return new RunResult(totalElapsedTime, t.getLatency());
+  }
+
+  private static int getAverageValueLength(final TestOptions opts) {
+    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
   }
 
   private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
       InterruptedException, ClassNotFoundException {
-    HBaseAdmin admin = null;
+    // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
+    // the TestOptions introspection for us and dump the output in a readable format.
+    LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
+    HBaseAdmin admin = new HBaseAdmin(getConf());
     try {
-      admin = new HBaseAdmin(getConf());
       checkTable(admin, opts);
     } finally {
-      if (admin != null) admin.close();
+      admin.close();
     }
+
     if (opts.nomapred) {
-      doLocalClients(cmd, opts);
+      doLocalClients(opts, getConf());
     } else {
-      doMapReduce(cmd, opts);
+      doMapReduce(opts, getConf());
     }
   }
 
   protected void printUsage() {
-    printUsage(null);
+    printUsage(this.getClass().getName(), null);
   }
 
-  protected void printUsage(final String message) {
+  protected static void printUsage(final String message) {
+    printUsage(PerformanceEvaluation.class.getName(), message);
+  }
+
+  protected static void printUsageAndExit(final String message, final int exitCode) {
+    printUsage(message);
+    System.exit(exitCode);
+  }
+
+  protected static void printUsage(final String className, final String message) {
     if (message != null && message.length() > 0) {
       System.err.println(message);
     }
-    System.err.println("Usage: java " + this.getClass().getName() + " \\");
-    System.err.println("  [--nomapred] [--rows=ROWS] [--table=NAME] \\");
-    System.err.println("  [--compress=TYPE] [--blockEncoding=TYPE] " +
-      "[-D<property=value>]* <command> <nclients>");
+    System.err.println("Usage: java " + className + " \\");
+    System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" nomapred        Run multiple clients using threads " +
       "(rather than use mapreduce)");
     System.err.println(" rows            Rows each client runs. Default: One million");
+    System.err.println(" size            Total size in GiB. Mutually exclusive with --rows. " +
+      "Default: 1.0.");
     System.err.println(" sampleRate      Execute test on a sample of total " +
       "rows. Only supported by randomRead. Default: 1.0");
+    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing every N rows. " +
+      "Default: 0");
     System.err.println(" table           Alternate table name. Default: 'TestTable'");
+    System.err.println(" multiGet        If >0, when doing RandomRead, perform multiple gets " +
+      "instead of single gets. Default: 0");
     System.err.println(" compress        Compression type to use (GZ, LZO, ...). Default: 'NONE'");
     System.err.println(" flushCommits    Used to determine if the test should flush the table. " +
       "Default: false");
     System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
+    System.err.println(" autoFlush       Set autoFlush on htable. Default: False");
+    System.err.println(" oneCon          all the threads share the same connection. Default: False");
     System.err.println(" presplit        Create presplit table. Recommended for accurate perf " +
       "analysis (see guide).  Default: disabled");
     System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
@@ -1131,18 +1786,30 @@ public class PerformanceEvaluation extends Configured implements Tool {
     System.err.println(" filterAll       Helps to filter out all the rows on the server side"
         + " there by not returning any thing back to the client.  Helps to check the server side"
         + " performance.  Uses FilterAllFilter internally. ");
-    System.err.println(" latency         Set to report operation latencies. " +
-      "Currently only supported by randomRead test. Default: False");
+    System.err.println(" latency         Set to report operation latencies. Default: False");
     System.err.println(" bloomFilter      Bloom filter type, one of " + Arrays.toString(BloomType.values()));
+    System.err.println(" valueSize       Pass value size to use: Default: 1024");
+    System.err.println(" valueRandom     Set if we should vary value size between 0 and " +
+        "'valueSize'; set on read for stats on size: Default: Not set.");
+    System.err.println(" valueZipf       Set if we should vary value size between 0 and " +
+        "'valueSize' in zipf form: Default: Not set.");
+    System.err.println(" period          Report every 'period' rows: " +
+      "Default: opts.perClientRunRows / 10");
+    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
+      "by randomRead. Default: disabled");
+    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
+    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
+    System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
+    System.err.println(" columns         Columns to write per row. Default: 1");
+    System.err.println(" caching         Scan caching to use. Default: 30");
     System.err.println();
     System.err.println(" Note: -D properties will be applied to the conf used. ");
     System.err.println("  For example: ");
-    System.err.println("   -Dmapred.output.compress=true");
-    System.err.println(" addColumns      Adds columns to scans/gets explicitly. Default: true");
+    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
     System.err.println("   -Dmapreduce.task.timeout=60000");
     System.err.println();
     System.err.println("Command:");
-    for (CmdDescriptor command : commands.values()) {
+    for (CmdDescriptor command : COMMANDS.values()) {
       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
     }
     System.err.println();
@@ -1151,163 +1818,282 @@ public class PerformanceEvaluation extends Configured implements Tool {
       "clients (and HRegionServers)");
     System.err.println("                 running: 1 <= value <= 500");
     System.err.println("Examples:");
-    System.err.println(" To run a single evaluation client:");
-    System.err.println(" $ bin/hbase " + this.getClass().getName()
-        + " sequentialWrite 1");
+    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
+    System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
+    System.err.println(" To run 10 clients doing increments over ten rows:");
+    System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10");
   }
 
-  private static int getNumClients(final int start, final String[] args) {
-    if(start + 1 > args.length) {
-      throw new IllegalArgumentException("must supply the number of clients");
-    }
-    int N = Integer.parseInt(args[start]);
-    if (N < 1) {
-      throw new IllegalArgumentException("Number of clients must be > 1");
-    }
-    return N;
-  }
+  /**
+   * Parse options passed in via an arguments array. Assumes that array has been split
+   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
+   * in the queue at the conclusion of this method call. It's up to the caller to deal
+   * with these unrecognized arguments.
+   */
+  static TestOptions parseOpts(Queue<String> args) {
+    TestOptions opts = new TestOptions();
+
+    String cmd = null;
+    while ((cmd = args.poll()) != null) {
+      if (cmd.equals("-h") || cmd.startsWith("--h")) {
+        // place item back onto queue so that caller knows parsing was incomplete
+        args.add(cmd);
+        break;
+      }
 
-  public int run(String[] args) throws Exception {
-    // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).
-    int errCode = -1;
-    if (args.length < 1) {
-      printUsage();
-      return errCode;
-    }
+      final String nmr = "--nomapred";
+      if (cmd.startsWith(nmr)) {
+        opts.nomapred = true;
+        continue;
+      }
 
-    try {
-      // MR-NOTE: if you are adding a property that is used to control an operation
-      // like put(), get(), scan(), ... you must also add it as part of the MR 
-      // input, take a look at writeInputFile().
-      // Then you must adapt the LINE_PATTERN input regex,
-      // and parse the argument, take a look at PEInputFormat.getSplits().
-
-      TestOptions opts = new TestOptions();
-
-      for (int i = 0; i < args.length; i++) {
-        String cmd = args[i];
-        if (cmd.equals("-h") || cmd.startsWith("--h")) {
-          printUsage();
-          errCode = 0;
-          break;
-        }
+      final String rows = "--rows=";
+      if (cmd.startsWith(rows)) {
+        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
+        continue;
+      }
 
-        final String nmr = "--nomapred";
-        if (cmd.startsWith(nmr)) {
-          opts.nomapred = true;
-          continue;
-        }
+      final String sampleRate = "--sampleRate=";
+      if (cmd.startsWith(sampleRate)) {
+        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+        continue;
+      }
 
-        final String rows = "--rows=";
-        if (cmd.startsWith(rows)) {
-          opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
-          continue;
-        }
+      final String table = "--table=";
+      if (cmd.startsWith(table)) {
+        opts.tableName = cmd.substring(table.length());
+        continue;
+      }
 
-        final String sampleRate = "--sampleRate=";
-        if (cmd.startsWith(sampleRate)) {
-          opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
-          continue;
-        }
+      final String startRow = "--startRow=";
+      if (cmd.startsWith(startRow)) {
+        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
+        continue;
+      }
 
-        final String table = "--table=";
-        if (cmd.startsWith(table)) {
-          opts.tableName = cmd.substring(table.length());
-          continue;
-        }
+      final String compress = "--compress=";
+      if (cmd.startsWith(compress)) {
+        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+        continue;
+      }
 
-        final String compress = "--compress=";
-        if (cmd.startsWith(compress)) {
-          opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
-          continue;
-        }
+      final String traceRate = "--traceRate=";
+      if (cmd.startsWith(traceRate)) {
+        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
+        continue;
+      }
 
-        final String blockEncoding = "--blockEncoding=";
-        if (cmd.startsWith(blockEncoding)) {
-          opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
-          continue;
-        }
+      final String blockEncoding = "--blockEncoding=";
+      if (cmd.startsWith(blockEncoding)) {
+        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+        continue;
+      }
 
-        final String flushCommits = "--flushCommits=";
-        if (cmd.startsWith(flushCommits)) {
-          opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
-          continue;
-        }
+      final String flushCommits = "--flushCommits=";
+      if (cmd.startsWith(flushCommits)) {
+        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+        continue;
+      }
 
-        final String writeToWAL = "--writeToWAL=";
-        if (cmd.startsWith(writeToWAL)) {
-          opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
-          continue;
-        }
+      final String writeToWAL = "--writeToWAL=";
+      if (cmd.startsWith(writeToWAL)) {
+        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+        continue;
+      }
 
-        final String presplit = "--presplit=";
-        if (cmd.startsWith(presplit)) {
-          opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
-          continue;
-        }
+      final String presplit = "--presplit=";
+      if (cmd.startsWith(presplit)) {
+        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+        continue;
+      }
 
-        final String inMemory = "--inmemory=";
-        if (cmd.startsWith(inMemory)) {
-          opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
-          continue;
-        }
+      final String inMemory = "--inmemory=";
+      if (cmd.startsWith(inMemory)) {
+        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+        continue;
+      }
 
-        final String latency = "--latency";
-        if (cmd.startsWith(latency)) {
-          opts.reportLatency = true;
-          continue;
-        }
+      final String autoFlush = "--autoFlush=";
+      if (cmd.startsWith(autoFlush)) {
+        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
+        continue;
+      }
 
-        final String multiGet = "--multiGet=";
-        if (cmd.startsWith(multiGet)) {
-          opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
-          continue;
-        }
+      final String onceCon = "--oneCon=";
+      if (cmd.startsWith(onceCon)) {
+        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
+        continue;
+      }
 
-        final String useTags = "--usetags=";
-        if (cmd.startsWith(useTags)) {
-          opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
-          continue;
-        }
-        
-        final String noOfTags = "--numoftags=";
-        if (cmd.startsWith(noOfTags)) {
-          opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
-          continue;
-        }
+      final String latency = "--latency";
+      if (cmd.startsWith(latency)) {
+        opts.reportLatency = true;
+        continue;
+      }
 
-        final String filterOutAll = "--filterAll";
-        if (cmd.startsWith(filterOutAll)) {
-          opts.filterAll = true;
-          continue;
-        }
+      final String multiGet = "--multiGet=";
+      if (cmd.startsWith(multiGet)) {
+        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+        continue;
+      }
+
+      final String useTags = "--usetags=";
+      if (cmd.startsWith(useTags)) {
+        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+        continue;
+      }
+
+      final String noOfTags = "--numoftags=";
+      if (cmd.startsWith(noOfTags)) {
+        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+        continue;
+      }
+
+      final String filterOutAll = "--filterAll";
+      if (cmd.startsWith(filterOutAll)) {
+        opts.filterAll = true;
+        continue;
+      }
+
+      final String size = "--size=";
+      if (cmd.startsWith(size)) {
+        opts.size = Float.parseFloat(cmd.substring(size.length()));
+        continue;
+      }
+
+      final String splitPolicy = "--splitPolicy=";
+      if (cmd.startsWith(splitPolicy)) {
+        opts.splitPolicy = cmd.substring(splitPolicy.length());
+        continue;
+      }
+
+      final String randomSleep = "--randomSleep=";
+      if (cmd.startsWith(randomSleep)) {
+        opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
+        continue;
+      }
 
-        final String bloomFilter = "--bloomFilter";
-        if (cmd.startsWith(bloomFilter)) {
-          opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
-          continue;
+      final String bloomFilter = "--bloomFilter=";
+      if (cmd.startsWith(bloomFilter)) {
+        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
+        continue;
+      }
+
+      final String valueSize = "--valueSize=";
+      if (cmd.startsWith(valueSize)) {
+        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
+        continue;
+      }
+
+      final String valueRandom = "--valueRandom";
+      if (cmd.startsWith(valueRandom)) {
+        opts.valueRandom = true;
+        if (opts.valueZipf) {
+          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
         }
+        continue;
+      }
 
-        final String addColumns = "--addColumns=";
-        if (cmd.startsWith(addColumns)) {
-          opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
-          continue;
+      final String valueZipf = "--valueZipf";
+      if (cmd.startsWith(valueZipf)) {
+        opts.valueZipf = true;
+        if (opts.valueRandom) {
+          throw new IllegalStateException("Either valueZipf or valueRandom but not both");
         }
+        continue;
+      }
+
+      final String period = "--period=";
+      if (cmd.startsWith(period)) {
+        opts.period = Integer.parseInt(cmd.substring(period.length()));
+        continue;
+      }
+
+      final String addColumns = "--addColumns=";
+      if (cmd.startsWith(addColumns)) {
+        opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
+        continue;
+      }
+
+      final String columns = "--columns=";
+      if (cmd.startsWith(columns)) {
+        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
+        continue;
+      }
+
+      final String caching = "--caching=";
+      if (cmd.startsWith(caching)) {
+        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
+        continue;
+      }
 
-        Class<? extends Test> cmdClass = determineCommandClass(cmd);
-        if (cmdClass != null) {
-          opts.numClientThreads = getNumClients(i + 1, args);
+      if (isCommandClass(cmd)) {
+        opts.cmdName = cmd;
+        opts.numClientThreads = Integer.parseInt(args.remove());
+        int rowsPerGB = getRowsPerGB(opts);
+        if (opts.size != DEFAULT_OPTS.size &&
+            opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
+          throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
+        }
+        if (opts.size != DEFAULT_OPTS.size) {
+          // total size in GB specified
+          opts.totalRows = (int) opts.size * rowsPerGB;
+          opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
+        } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
           // number of rows specified
           opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
-          runTest(cmdClass, opts);
-          errCode = 0;
-          break;
+          opts.size = opts.totalRows / rowsPerGB;
         }
+        break;
+      } else {
+        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
+      }
+
+      // Not matching any option or command.
+      System.err.println("Error: Wrong option or command: " + cmd);
+      args.add(cmd);
+      break;
+    }
+    return opts;
+  }
+
+  static int getRowsPerGB(final TestOptions opts) {
+    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    // Process command-line args. TODO: Better cmd-line processing
+    // (but hopefully something not as painful as cli options).
+    int errCode = -1;
+    if (args.length < 1) {
+      printUsage();
+      return errCode;
+    }
 
+    try {
+      LinkedList<String> argv = new LinkedList<String>();
+      argv.addAll(Arrays.asList(args));
+      TestOptions opts = parseOpts(argv);
+
+      // args remaining, print help and exit
+      if (!argv.isEmpty()) {
+        errCode = 0;
         printUsage();
-        break;
+        return errCode;
       }
+
+      // must run at least 1 client
+      if (opts.numClientThreads <= 0) {
+        throw new IllegalArgumentException("Number of clients must be > 0");
+      }
+
+      Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
+      if (cmdClass != null) {
+        runTest(cmdClass, opts);
+        errCode = 0;
+      }
+
     } catch (Exception e) {
       e.printStackTrace();
     }
@@ -1315,8 +2101,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
     return errCode;
   }
 
-  private Class<? extends Test> determineCommandClass(String cmd) {
-    CmdDescriptor descriptor = commands.get(cmd);
+  private static boolean isCommandClass(String cmd) {
+    return COMMANDS.containsKey(cmd);
+  }
+
+  private static Class<? extends Test> determineCommandClass(String cmd) {
+    CmdDescriptor descriptor = COMMANDS.get(cmd);
     return descriptor != null ? descriptor.getCmdClass() : null;
   }