You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:15 UTC

[29/49] git commit: HBASE-10791 Add integration test to demonstrate performance improvement

HBASE-10791 Add integration test to demonstrate performance improvement

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1585807 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d313103a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d313103a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d313103a

Branch: refs/heads/master
Commit: d313103aeb66bd6a8081f76578a35e26d5a0fc78
Parents: 7d24752
Author: ndimiduk <nd...@unknown>
Authored: Tue Apr 8 18:18:41 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:39 2014 -0700

----------------------------------------------------------------------
 .../hbase/IntegrationTestRegionReplicaPerf.java | 338 +++++++++++
 .../hadoop/hbase/PerformanceEvaluation.java     | 598 +++++++++++--------
 2 files changed, 678 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d313103a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
new file mode 100644
index 0000000..ca3a8f0
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
@@ -0,0 +1,338 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import com.google.common.base.Objects;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
+import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+import org.apache.hadoop.hbase.chaos.policies.Policy;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static java.lang.String.format;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for comparing the performance impact of region replicas. Uses
+ * components of {@link PerformanceEvaluation}. Does not run from
+ * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
+ * with the JUnit runner. Hence no @Test annotations either. See {@code -help}
+ * for full list of options.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
+
+  private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
+
+  private static final String SLEEP_TIME_KEY = "sleeptime";
+  // short default interval because tests don't run very long.
+  private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
+  private static final String TABLE_NAME_KEY = "tableName";
+  private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
+  private static final String NOMAPRED_KEY = "nomapred";
+  private static final boolean NOMAPRED_DEFAULT = false;
+  private static final String REPLICA_COUNT_KEY = "replicas";
+  private static final String REPLICA_COUNT_DEFAULT = "" + 3;
+  private static final String PRIMARY_TIMEOUT_KEY = "timeout";
+  private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
+  private static final String NUM_RS_KEY = "numRs";
+  private static final String NUM_RS_DEFAULT = "" + 3;
+
+  private TableName tableName;
+  private long sleepTime;
+  private boolean nomapred = NOMAPRED_DEFAULT;
+  private int replicaCount;
+  private int primaryTimeout;
+  private int clusterSize;
+
+  /**
+   * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
+   */
+  static class PerfEvalCallable implements Callable<TimingResult> {
+    private final Queue<String> argv = new LinkedList<String>();
+    private final HBaseAdmin admin;
+
+    public PerfEvalCallable(HBaseAdmin admin, String argv) {
+      // TODO: this API is awkward, should take HConnection, not HBaseAdmin
+      this.admin = admin;
+      this.argv.addAll(Arrays.asList(argv.split(" ")));
+      LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
+    }
+
+    @Override
+    public TimingResult call() throws Exception {
+      PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
+      PerformanceEvaluation.checkTable(admin, opts);
+      long numRows = opts.totalRows;
+      long elapsedTime;
+      if (opts.nomapred) {
+        elapsedTime = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
+      } else {
+        Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
+        Counters counters = job.getCounters();
+        numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
+        elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
+      }
+      return new TimingResult(numRows, elapsedTime);
+    }
+  }
+
+  /**
+   * Record the results from a single {@link PerformanceEvaluation} job run.
+   */
+  static class TimingResult {
+    public long numRows;
+    public long elapsedTime;
+
+    public TimingResult(long numRows, long elapsedTime) {
+      this.numRows = numRows;
+      this.elapsedTime = elapsedTime;
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("numRows", numRows)
+        .add("elapsedTime", elapsedTime)
+        .toString();
+    }
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = util.getConfiguration();
+
+    // sanity check cluster
+    // TODO: this should reach out to master and verify online state instead
+    assertEquals("Master must be configured with StochasticLoadBalancer",
+      "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
+      conf.get("hbase.master.loadbalancer.class"));
+    // TODO: this should reach out to master and verify online state instead
+    assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
+      conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
+
+    // enable client-side settings
+    conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true);
+    // TODO: expose these settings to CLI override
+    conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
+    conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
+  }
+
+  @Override
+  public void setUpCluster() throws Exception {
+    util = getTestingUtil(getConf());
+    util.initializeCluster(clusterSize);
+  }
+
+  @Override
+  public void setUpMonkey() throws Exception {
+    Policy p = new PeriodicRandomActionPolicy(sleepTime,
+      new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString()),
+      new MoveRandomRegionOfTableAction(tableName.getNameAsString()));
+    this.monkey = new PolicyBasedChaosMonkey(util, p);
+    // don't start monkey right away
+  }
+
+  @Override
+  protected void addOptions() {
+    addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
+      + TABLE_NAME_DEFAULT + "'");
+    addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
+      + SLEEP_TIME_DEFAULT);
+    addOptNoArg(NOMAPRED_KEY,
+      "Run multiple clients using threads (rather than use mapreduce)");
+    addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
+      + REPLICA_COUNT_DEFAULT);
+    addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
+      + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
+    addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
+      + NUM_RS_DEFAULT);
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
+    sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
+    nomapred = cmd.hasOption(NOMAPRED_KEY);
+    replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
+    primaryTimeout =
+      Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
+    clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
+    LOG.debug(Objects.toStringHelper("Parsed Options")
+      .add(TABLE_NAME_KEY, tableName)
+      .add(SLEEP_TIME_KEY, sleepTime)
+      .add(NOMAPRED_KEY, nomapred)
+      .add(REPLICA_COUNT_KEY, replicaCount)
+      .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
+      .add(NUM_RS_KEY, clusterSize)
+      .toString());
+  }
+
+  @Override
+  public int runTestFromCommandLine() throws Exception {
+    test();
+    return 0;
+  }
+
+  @Override
+  public String getTablename() {
+    return tableName.getNameAsString();
+  }
+
+  @Override
+  protected Set<String> getColumnFamilies() {
+    return null;
+  }
+
+  /**
+   * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
+   */
+  private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception {
+    admin.modifyTable(desc.getTableName(), desc);
+    Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
+      setFirst(0);
+      setSecond(0);
+    }};
+    for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds
+      status = admin.getAlterStatus(desc.getTableName());
+      if (status.getSecond() != 0) {
+        LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
+          + " regions updated.");
+        Thread.sleep(1 * 1000l);
+      } else {
+        LOG.debug("All regions updated.");
+      }
+    }
+    if (status.getSecond() != 0) {
+      throw new Exception("Failed to update replica count after 500 seconds.");
+    }
+  }
+
+  /**
+   * Set the number of Region replicas.
+   */
+  private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount)
+      throws Exception {
+    admin.disableTable(table);
+    HTableDescriptor desc = admin.getTableDescriptor(table);
+    desc.setRegionReplication(replicaCount);
+    modifyTableSync(admin, desc);
+    admin.enableTable(table);
+  }
+
+  public void test() throws Exception {
+    int maxIters = 3;
+    String mr = nomapred ? "--nomapred" : "";
+    String replicas = "--replicas=" + replicaCount;
+    // TODO: splits disabled until "phase 2" is complete.
+    String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
+    String writeOpts = format("%s %s --table=%s --presplit=16 sequentialWrite 4",
+      mr, splitPolicy, tableName);
+    String readOpts =
+      format("%s --table=%s --latency --sampleRate=0.1 randomRead 4", mr, tableName);
+    String replicaReadOpts = format("%s %s", replicas, readOpts);
+
+    ArrayList<TimingResult> resultsWithoutReplica = new ArrayList<TimingResult>(maxIters);
+    ArrayList<TimingResult> resultsWithReplica = new ArrayList<TimingResult>(maxIters);
+
+    // create/populate the table, replicas disabled
+    LOG.debug("Populating table.");
+    new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
+
+    // one last sanity check, then send in the clowns!
+    assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
+      DisabledRegionSplitPolicy.class.getName(),
+      util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
+    startMonkey();
+
+    // collect a baseline without region replicas.
+    for (int i = 0; i < maxIters; i++) {
+      LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
+      resultsWithoutReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
+      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
+      Thread.sleep(5000l);
+    }
+
+    // disable monkey, enable region replicas, enable monkey
+    cleanUpMonkey("Altering table.");
+    LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
+    setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
+    setUpMonkey();
+    startMonkey();
+
+    // run test with region replicas.
+    for (int i = 0; i < maxIters; i++) {
+      LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
+      resultsWithReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
+      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
+      Thread.sleep(5000l);
+    }
+
+    DescriptiveStatistics withoutReplicaStats = new DescriptiveStatistics();
+    for (TimingResult tr : resultsWithoutReplica) {
+      withoutReplicaStats.addValue(tr.elapsedTime);
+    }
+    DescriptiveStatistics withReplicaStats = new DescriptiveStatistics();
+    for (TimingResult tr : resultsWithReplica) {
+      withReplicaStats.addValue(tr.elapsedTime);
+    }
+
+    LOG.info(Objects.toStringHelper("testName")
+      .add("withoutReplicas", resultsWithoutReplica)
+      .add("withReplicas", resultsWithReplica)
+      .add("withoutReplicasMean", withoutReplicaStats.getMean())
+      .add("withReplicasMean", withReplicaStats.getMean())
+      .toString());
+
+    assertTrue(
+      "Running with region replicas under chaos should be as fast or faster than without. "
+      + "withReplicas.mean: " + withReplicaStats.getMean() + "ms "
+      + "withoutReplicas.mean: " + withoutReplicaStats.getMean() + "ms.",
+      withReplicaStats.getMean() <= withoutReplicaStats.getMean());
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    IntegrationTestingUtility.setUseDistributedCluster(conf);
+    int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
+    System.exit(status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d313103a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index 227fc1a..d9fa0b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -30,7 +30,9 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -39,12 +41,15 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -84,7 +89,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.codehaus.jackson.map.ObjectMapper;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.stats.UniformSample;
 import com.yammer.metrics.stats.Snapshot;
@@ -99,16 +103,17 @@ import org.htrace.impl.ProbabilitySampler;
  * client that steps through one of a set of hardcoded tests or 'experiments'
  * (e.g. a random reads test, a random writes test, etc.). Pass on the
  * command-line which test to run and how many clients are participating in
- * this experiment. Run <code>java PerformanceEvaluation --help</code> to
- * obtain usage.
+ * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
  *
  * <p>This class sets up and runs the evaluation programs described in
  * Section 7, <i>Performance Evaluation</i>, of the <a
  * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
  * paper, pages 8-10.
  *
- * <p>If number of clients > 1, we start up a MapReduce job. Each map task
- * runs an individual client. Each client does about 1GB of data.
+ * <p>By default, runs as a mapreduce job where each mapper runs a single test
+ * client. Can also run as a non-mapreduce, multithreaded application by
+ * specifying {@code --nomapred}. Each client does about 1GB of data, unless
+ * specified otherwise.
  */
 public class PerformanceEvaluation extends Configured implements Tool {
   protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
@@ -133,10 +138,35 @@ public class PerformanceEvaluation extends Configured implements Tool {
   private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
   private static final TestOptions DEFAULT_OPTS = new TestOptions();
 
-  protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
-
+  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
 
+  static {
+    addCommandDescriptor(RandomReadTest.class, "randomRead",
+      "Run random read test");
+    addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
+      "Run random seek and scan 100 test");
+    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
+      "Run random seek scan with both start and stop row (max 10 rows)");
+    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
+      "Run random seek scan with both start and stop row (max 100 rows)");
+    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
+      "Run random seek scan with both start and stop row (max 1000 rows)");
+    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
+      "Run random seek scan with both start and stop row (max 10000 rows)");
+    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
+      "Run random write test");
+    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
+      "Run sequential read test");
+    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
+      "Run sequential write test");
+    addCommandDescriptor(ScanTest.class, "scan",
+      "Run scan test (read every row)");
+    addCommandDescriptor(FilteredScanTest.class, "filterScan",
+      "Run scan test using a filter to find a specific row based on it's value " +
+        "(make sure to use --rows=20)");
+  }
+
   /**
    * Enum for map metrics.  Keep it out here rather than inside in the Map
    * inner-class so we can find associated properties.
@@ -154,37 +184,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
    */
   public PerformanceEvaluation(final Configuration conf) {
     super(conf);
-
-    addCommandDescriptor(RandomReadTest.class, "randomRead",
-        "Run random read test");
-    addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
-        "Run random seek and scan 100 test");
-    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
-        "Run random seek scan with both start and stop row (max 10 rows)");
-    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
-        "Run random seek scan with both start and stop row (max 100 rows)");
-    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
-        "Run random seek scan with both start and stop row (max 1000 rows)");
-    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
-        "Run random seek scan with both start and stop row (max 10000 rows)");
-    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
-        "Run random write test");
-    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
-        "Run sequential read test");
-    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
-        "Run sequential write test");
-    addCommandDescriptor(ScanTest.class, "scan",
-        "Run scan test (read every row)");
-    addCommandDescriptor(FilteredScanTest.class, "filterScan",
-        "Run scan test using a filter to find a specific row based on it's value " +
-        "(make sure to use --rows=20)");
   }
 
-  protected void addCommandDescriptor(Class<? extends Test> cmdClass,
+  protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
       String name, String description) {
-    CmdDescriptor cmdDescriptor =
-      new CmdDescriptor(cmdClass, name, description);
-    commands.put(name, cmdDescriptor);
+    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
+    COMMANDS.put(name, cmdDescriptor);
   }
 
   /**
@@ -235,10 +240,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
       }
     }
 
+    @Override
     protected void map(LongWritable key, Text value, final Context context)
            throws IOException, InterruptedException {
 
       Status status = new Status() {
+        @Override
         public void setStatus(String msg) {
            context.setStatus(msg);
         }
@@ -260,35 +267,62 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   /*
-   * If table does not already exist, create.
-   * @param c Client to use checking.
-   * @return True if we created the table.
-   * @throws IOException
+   * If table does not already exist, create. Also create a table when
+   * {@code opts.presplitRegions} is specified or when the existing table's
+   * region replica count doesn't match {@code opts.replicas}.
    */
-  private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
-    HTableDescriptor tableDescriptor = getTableDescriptor(opts);
-    if (opts.presplitRegions > 0) {
-      // presplit requested
-      if (admin.tableExists(tableDescriptor.getTableName())) {
-        admin.disableTable(tableDescriptor.getTableName());
-        admin.deleteTable(tableDescriptor.getTableName());
+  static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
+    TableName tableName = TableName.valueOf(opts.tableName);
+    boolean needsDelete = false, exists = admin.tableExists(tableName);
+    boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
+      || opts.cmdName.toLowerCase().contains("scan");
+    if (!exists && isReadCmd) {
+      throw new IllegalStateException(
+        "Must specify an existing table for read commands. Run a write command first.");
+    }
+    HTableDescriptor desc =
+      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
+    byte[][] splits = getSplits(opts);
+
+    // recreate the table when user has requested presplit or when existing
+    // {RegionSplitPolicy,replica count} does not match requested.
+    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
+      || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)
+      || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) {
+      needsDelete = true;
+      // wait, why did it delete my table?!?
+      LOG.debug(Objects.toStringHelper("needsDelete")
+        .add("needsDelete", needsDelete)
+        .add("isReadCmd", isReadCmd)
+        .add("exists", exists)
+        .add("desc", desc)
+        .add("presplit", opts.presplitRegions)
+        .add("splitPolicy", opts.splitPolicy)
+        .add("replicas", opts.replicas));
+    }
+
+    // remove an existing table
+    if (needsDelete) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
       }
+      admin.deleteTable(tableName);
+    }
 
-      byte[][] splits = getSplits(opts);
-      for (int i=0; i < splits.length; i++) {
-        LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
-      }
-      admin.createTable(tableDescriptor, splits);
-      LOG.info ("Table created with " + opts.presplitRegions + " splits");
-    }
-    else {
-      boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
-      if (!tableExists) {
-        admin.createTable(tableDescriptor);
-        LOG.info("Table " + tableDescriptor + " created");
+    // table creation is necessary
+    if (!exists || needsDelete) {
+      desc = getTableDescriptor(opts);
+      if (splits != null) {
+        if (LOG.isDebugEnabled()) {
+          for (int i = 0; i < splits.length; i++) {
+            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+          }
+        }
       }
+      admin.createTable(desc, splits);
+      LOG.info("Table " + desc + " created");
     }
-    return admin.tableExists(tableDescriptor.getTableName());
+    return admin.tableExists(tableName);
   }
 
   /**
@@ -304,6 +338,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
       family.setInMemory(true);
     }
     desc.addFamily(family);
+    if (opts.replicas != DEFAULT_OPTS.replicas) {
+      desc.setRegionReplication(opts.replicas);
+    }
+    if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
+      desc.setRegionSplitPolicyClassName(opts.splitPolicy);
+    }
     return desc;
   }
 
@@ -311,8 +351,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * generates splits based on total number of rows and specified split regions
    */
   protected static byte[][] getSplits(TestOptions opts) {
-    if (opts.presplitRegions == 0)
-      return new byte [0][];
+    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
+      return null;
 
     int numSplitPoints = opts.presplitRegions - 1;
     byte[][] splits = new byte[numSplitPoints][];
@@ -329,8 +369,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
+  static long doLocalClients(final TestOptions opts, final Configuration conf)
       throws IOException, InterruptedException {
+    final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+    assert cmd != null;
     Future<Long>[] threads = new Future[opts.numClientThreads];
     long[] timings = new long[opts.numClientThreads];
     ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
@@ -342,7 +384,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
         public Long call() throws Exception {
           TestOptions threadOpts = new TestOptions(opts);
           if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
-          long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
+          long elapsedTime = runOneClient(cmd, conf, threadOpts, new Status() {
+            @Override
             public void setStatus(final String msg) throws IOException {
               LOG.info(msg);
             }
@@ -370,9 +413,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
       total += timing;
     }
     LOG.info("[" + test + "]"
-             + "\tMin: " + timings[0] + "ms"
-             + "\tMax: " + timings[timings.length - 1] + "ms"
-             + "\tAvg: " + (total / timings.length) + "ms");
+      + "\tMin: " + timings[0] + "ms"
+      + "\tMax: " + timings[timings.length - 1] + "ms"
+      + "\tAvg: " + (total / timings.length) + "ms");
+    return total;
   }
 
   /*
@@ -382,15 +426,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
-        InterruptedException, ClassNotFoundException {
-    Configuration conf = getConf();
+  static Job doMapReduce(TestOptions opts, final Configuration conf)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+    assert cmd != null;
     Path inputDir = writeInputFile(conf, opts);
     conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
-    conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
+    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
     Job job = new Job(conf);
     job.setJarByClass(PerformanceEvaluation.class);
-    job.setJobName("HBase Performance Evaluation");
+    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
 
     job.setInputFormatClass(NLineInputFormat.class);
     NLineInputFormat.setInputPaths(job, inputDir);
@@ -410,12 +455,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
 
     TableMapReduceUtil.addDependencyJars(job);
     TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-      Histogram.class,     // yammer metrics   
+      Histogram.class,     // yammer metrics
       ObjectMapper.class); // jackson-mapper-asl
 
     TableMapReduceUtil.initCredentials(job);
 
     job.waitForCompletion(true);
+    return job;
   }
 
   /*
@@ -424,7 +470,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @return Directory that contains file written.
    * @throws IOException
    */
-  private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
+  private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
     Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
     Path inputDir = new Path(jobdir, "inputs");
@@ -491,6 +537,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * This makes tracking all these arguments a little easier.
    */
   static class TestOptions {
+    String cmdName = null;
     boolean nomapred = false;
     boolean filterAll = false;
     int startRow = 0;
@@ -511,6 +558,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
     int multiGet = 0;
     boolean inMemoryCF = false;
     int presplitRegions = 0;
+    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
+    String splitPolicy = null;
     Compression.Algorithm compression = Compression.Algorithm.NONE;
     BloomType bloomType = BloomType.ROW;
     DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
@@ -521,6 +570,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     public TestOptions() {}
 
     public TestOptions(TestOptions that) {
+      this.cmdName = that.cmdName;
       this.nomapred = that.nomapred;
       this.startRow = that.startRow;
       this.size = that.size;
@@ -540,6 +590,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
       this.multiGet = that.multiGet;
       this.inMemoryCF = that.inMemoryCF;
       this.presplitRegions = that.presplitRegions;
+      this.replicas = that.replicas;
+      this.splitPolicy = that.splitPolicy;
       this.compression = that.compression;
       this.blockEncoding = that.blockEncoding;
       this.filterAll = that.filterAll;
@@ -712,7 +764,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       if (rs == null || !isRandomValueSize()) return;
       for (Result r: rs) updateValueSize(r);
     }
- 
+
     void updateValueSize(final Result r) throws IOException {
       if (r == null || !isRandomValueSize()) return;
       int size = 0;
@@ -731,7 +783,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
         (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
     }
- 
+
     boolean isRandomValueSize() {
       return opts.valueRandom;
     }
@@ -825,7 +877,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
           valueSize.count() + " measures");
       reportHistogram(this.valueSize);
     }
- 
+
     private void reportHistogram(final Histogram h) throws IOException {
       Snapshot sn = h.getSnapshot();
       status.setStatus(testName + " Min      = " + h.min());
@@ -997,10 +1049,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   static class RandomReadTest extends Test {
+    private final Consistency consistency;
     private ArrayList<Get> gets;
 
     RandomReadTest(HConnection con, TestOptions options, Status status) {
       super(con, options, status);
+      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
       if (opts.multiGet > 0) {
         LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
         this.gets = new ArrayList<Get>(opts.multiGet);
@@ -1014,6 +1068,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       if (opts.filterAll) {
         get.setFilter(new FilterAllFilter());
       }
+      get.setConsistency(consistency);
       if (LOG.isTraceEnabled()) LOG.trace(get.toString());
       if (opts.multiGet > 0) {
         this.gets.add(get);
@@ -1313,9 +1368,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
       if (admin != null) admin.close();
     }
     if (opts.nomapred) {
-      doLocalClients(cmd, opts);
+      doLocalClients(opts, getConf());
     } else {
-      doMapReduce(cmd, opts);
+      doMapReduce(opts, getConf());
     }
   }
 
@@ -1368,6 +1423,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
         "'valueSize'; set on read for stats on size: Default: Not set.");
     System.err.println(" period          Report every 'period' rows: " +
       "Default: opts.perClientRunRows / 10");
+    System.err.println(" multiGet        Batch gets together into groups of N. Only supported " +
+      "by randomRead. Default: disabled");
+    System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
+    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
     System.err.println();
     System.err.println(" Note: -D properties will be applied to the conf used. ");
     System.err.println("  For example: ");
@@ -1375,7 +1434,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     System.err.println("   -Dmapreduce.task.timeout=60000");
     System.err.println();
     System.err.println("Command:");
-    for (CmdDescriptor command : commands.values()) {
+    for (CmdDescriptor command : COMMANDS.values()) {
       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
     }
     System.err.println();
@@ -1389,215 +1448,234 @@ public class PerformanceEvaluation extends Configured implements Tool {
         + " sequentialWrite 1");
   }
 
-  private static int getNumClients(final int start, final String[] args) {
-    if(start + 1 > args.length) {
-      throw new IllegalArgumentException("must supply the number of clients");
-    }
-    int N = Integer.parseInt(args[start]);
-    if (N < 1) {
-      throw new IllegalArgumentException("Number of clients must be > 1");
-    }
-    return N;
-  }
+  /**
+   * Parse options passed in via an arguments array. Assumes that array has been split
+   * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
+   * in the queue at the conclusion of this method call. It's up to the caller to deal
+   * with these unrecognized arguments.
+   */
+  static TestOptions parseOpts(Queue<String> args) {
+    TestOptions opts = new TestOptions();
+
+    String cmd = null;
+    while ((cmd = args.poll()) != null) {
+      if (cmd.equals("-h") || cmd.startsWith("--h")) {
+        // place item back onto queue so that caller knows parsing was incomplete
+        args.add(cmd);
+        break;
+      }
 
-  public int run(String[] args) throws Exception {
-    // Process command-line args. TODO: Better cmd-line processing
-    // (but hopefully something not as painful as cli options).
-    int errCode = -1;
-    if (args.length < 1) {
-      printUsage();
-      return errCode;
-    }
+      final String nmr = "--nomapred";
+      if (cmd.startsWith(nmr)) {
+        opts.nomapred = true;
+        continue;
+      }
 
-    try {
-      // MR-NOTE: if you are adding a property that is used to control an operation
-      // like put(), get(), scan(), ... you must also add it as part of the MR 
-      // input, take a look at writeInputFile().
-      // Then you must adapt the LINE_PATTERN input regex,
-      // and parse the argument, take a look at PEInputFormat.getSplits().
-
-      TestOptions opts = new TestOptions();
-
-      for (int i = 0; i < args.length; i++) {
-        String cmd = args[i];
-        if (cmd.equals("-h") || cmd.startsWith("--h")) {
-          printUsage();
-          errCode = 0;
-          break;
-        }
+      final String rows = "--rows=";
+      if (cmd.startsWith(rows)) {
+        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
+        continue;
+      }
 
-        final String nmr = "--nomapred";
-        if (cmd.startsWith(nmr)) {
-          opts.nomapred = true;
-          continue;
-        }
+      final String sampleRate = "--sampleRate=";
+      if (cmd.startsWith(sampleRate)) {
+        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+        continue;
+      }
 
-        final String rows = "--rows=";
-        if (cmd.startsWith(rows)) {
-          opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
-          continue;
-        }
+      final String table = "--table=";
+      if (cmd.startsWith(table)) {
+        opts.tableName = cmd.substring(table.length());
+        continue;
+      }
 
-        final String startRow = "--startRow=";
-        if (cmd.startsWith(startRow)) {
-          opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
-          continue;
-        }
+      final String startRow = "--startRow=";
+      if (cmd.startsWith(startRow)) {
+        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
+        continue;
+      }
 
-        final String sampleRate = "--sampleRate=";
-        if (cmd.startsWith(sampleRate)) {
-          opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
-          continue;
-        }
+      final String compress = "--compress=";
+      if (cmd.startsWith(compress)) {
+        opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+        continue;
+      }
 
-        final String traceRate = "--traceRate=";
-        if (cmd.startsWith(traceRate)) {
-          opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
-          continue;
-        }
+      final String traceRate = "--traceRate=";
+      if (cmd.startsWith(traceRate)) {
+        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
+        continue;
+      }
 
-        final String table = "--table=";
-        if (cmd.startsWith(table)) {
-          opts.tableName = cmd.substring(table.length());
-          continue;
-        }
+      final String blockEncoding = "--blockEncoding=";
+      if (cmd.startsWith(blockEncoding)) {
+        opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+        continue;
+      }
 
-        final String compress = "--compress=";
-        if (cmd.startsWith(compress)) {
-          opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
-          continue;
-        }
+      final String flushCommits = "--flushCommits=";
+      if (cmd.startsWith(flushCommits)) {
+        opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+        continue;
+      }
 
-        final String blockEncoding = "--blockEncoding=";
-        if (cmd.startsWith(blockEncoding)) {
-          opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
-          continue;
-        }
+      final String writeToWAL = "--writeToWAL=";
+      if (cmd.startsWith(writeToWAL)) {
+        opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+        continue;
+      }
 
-        final String flushCommits = "--flushCommits=";
-        if (cmd.startsWith(flushCommits)) {
-          opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
-          continue;
-        }
+      final String presplit = "--presplit=";
+      if (cmd.startsWith(presplit)) {
+        opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+        continue;
+      }
 
-        final String writeToWAL = "--writeToWAL=";
-        if (cmd.startsWith(writeToWAL)) {
-          opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
-          continue;
-        }
+      final String inMemory = "--inmemory=";
+      if (cmd.startsWith(inMemory)) {
+        opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+        continue;
+      }
 
-        final String autoFlush = "--autoFlush=";
-        if (cmd.startsWith(autoFlush)) {
-          opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
-          continue;
-        }
+      final String autoFlush = "--autoFlush=";
+      if (cmd.startsWith(autoFlush)) {
+        opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
+        continue;
+      }
 
-        final String onceCon = "--oneCon=";
-        if (cmd.startsWith(onceCon)) {
-          opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
-          continue;
-        }
+      final String onceCon = "--oneCon=";
+      if (cmd.startsWith(onceCon)) {
+        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
+        continue;
+      }
 
-        final String presplit = "--presplit=";
-        if (cmd.startsWith(presplit)) {
-          opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
-          continue;
-        }
+      final String latency = "--latency";
+      if (cmd.startsWith(latency)) {
+        opts.reportLatency = true;
+        continue;
+      }
 
-        final String inMemory = "--inmemory=";
-        if (cmd.startsWith(inMemory)) {
-          opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
-          continue;
-        }
+      final String multiGet = "--multiGet=";
+      if (cmd.startsWith(multiGet)) {
+        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+        continue;
+      }
 
-        final String latency = "--latency";
-        if (cmd.startsWith(latency)) {
-          opts.reportLatency = true;
-          continue;
-        }
+      final String useTags = "--usetags=";
+      if (cmd.startsWith(useTags)) {
+        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+        continue;
+      }
 
-        final String multiGet = "--multiGet=";
-        if (cmd.startsWith(multiGet)) {
-          opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
-          continue;
-        }
+      final String noOfTags = "--nooftags=";
+      if (cmd.startsWith(noOfTags)) {
+        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+        continue;
+      }
 
-        final String useTags = "--usetags=";
-        if (cmd.startsWith(useTags)) {
-          opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
-          continue;
-        }
+      final String replicas = "--replicas=";
+      if (cmd.startsWith(replicas)) {
+        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
+        continue;
+      }
 
-        final String noOfTags = "--nooftags=";
-        if (cmd.startsWith(noOfTags)) {
-          opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
-          continue;
-        }
+      final String filterOutAll = "--filterAll";
+      if (cmd.startsWith(filterOutAll)) {
+        opts.filterAll = true;
+        continue;
+      }
 
-        final String filterOutAll = "--filterAll";
-        if (cmd.startsWith(filterOutAll)) {
-          opts.filterAll = true;
-          continue;
-        }
+      final String size = "--size=";
+      if (cmd.startsWith(size)) {
+        opts.size = Float.parseFloat(cmd.substring(size.length()));
+        continue;
+      }
 
-        final String size = "--size=";
-        if (cmd.startsWith(size)) {
-          opts.size = Float.parseFloat(cmd.substring(size.length()));
-          continue;
-        }
+      final String splitPolicy = "--splitPolicy=";
+      if (cmd.startsWith(splitPolicy)) {
+        opts.splitPolicy = cmd.substring(splitPolicy.length());
+        continue;
+      }
 
-        final String bloomFilter = "--bloomFilter";
-        if (cmd.startsWith(bloomFilter)) {
-          opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
-          continue;
-        }
+      final String bloomFilter = "--bloomFilter";
+      if (cmd.startsWith(bloomFilter)) {
+        opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
+        continue;
+      }
 
-        final String valueSize = "--valueSize=";
-        if (cmd.startsWith(valueSize)) {
-          opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
-          continue;
-        }
+      final String valueSize = "--valueSize=";
+      if (cmd.startsWith(valueSize)) {
+        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
+        continue;
+      }
 
-        final String valueRandom = "--valueRandom";
-        if (cmd.startsWith(valueRandom)) {
-          opts.valueRandom = true;
-          continue;
-        }
+      final String valueRandom = "--valueRandom";
+      if (cmd.startsWith(valueRandom)) {
+        opts.valueRandom = true;
+        continue;
+      }
 
-        final String period = "--period=";
-        if (cmd.startsWith(period)) {
-          opts.period = Integer.parseInt(cmd.substring(period.length()));
-          continue;
-        }
+      final String period = "--period=";
+      if (cmd.startsWith(period)) {
+        opts.period = Integer.parseInt(cmd.substring(period.length()));
+        continue;
+      }
 
-        Class<? extends Test> cmdClass = determineCommandClass(cmd);
-        if (cmdClass != null) {
-          opts.numClientThreads = getNumClients(i + 1, args);
-          if (opts.size != DEFAULT_OPTS.size &&
+      if (isCommandClass(cmd)) {
+        opts.cmdName = cmd;
+        opts.numClientThreads = Integer.parseInt(args.remove());
+        int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
+        if (opts.size != DEFAULT_OPTS.size &&
             opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
-            throw new IllegalArgumentException(rows + " and " + size +
-              " are mutually exclusive arguments.");
-          }
-          // Calculate how many rows per gig.  If random value size presume that that half the max
-          // is average row size.
-          int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
-          if (opts.size != DEFAULT_OPTS.size) {
-            // total size in GB specified
-            opts.totalRows = (int) opts.size * rowsPerGB;
-            opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
-          } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
-            // number of rows specified
-            opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
-            opts.size = opts.totalRows / rowsPerGB;
-          }
-          runTest(cmdClass, opts);
-          errCode = 0;
-          break;
+          throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
+        }
+        if (opts.size != DEFAULT_OPTS.size) {
+          // total size in GB specified
+          opts.totalRows = (int) opts.size * rowsPerGB;
+          opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
+        } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
+          // number of rows specified
+          opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
+          opts.size = opts.totalRows / rowsPerGB;
         }
+        break;
+      }
+    }
+    return opts;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    // Process command-line args. TODO: Better cmd-line processing
+    // (but hopefully something not as painful as cli options).
+    int errCode = -1;
+    if (args.length < 1) {
+      printUsage();
+      return errCode;
+    }
+
+    try {
+      LinkedList<String> argv = new LinkedList<String>();
+      argv.addAll(Arrays.asList(args));
+      TestOptions opts = parseOpts(argv);
 
+      // args remainting, print help and exit
+      if (!argv.isEmpty()) {
+        errCode = 0;
         printUsage();
-        break;
       }
+
+      // must run at least 1 client
+      if (opts.numClientThreads <= 0) {
+        throw new IllegalArgumentException("Number of clients must be > 0");
+      }
+
+      Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
+      if (cmdClass != null) {
+        runTest(cmdClass, opts);
+        errCode = 0;
+      }
+
     } catch (Exception e) {
       e.printStackTrace();
     }
@@ -1605,8 +1683,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
     return errCode;
   }
 
-  private Class<? extends Test> determineCommandClass(String cmd) {
-    CmdDescriptor descriptor = commands.get(cmd);
+  private static boolean isCommandClass(String cmd) {
+    return COMMANDS.containsKey(cmd);
+  }
+
+  private static Class<? extends Test> determineCommandClass(String cmd) {
+    CmdDescriptor descriptor = COMMANDS.get(cmd);
     return descriptor != null ? descriptor.getCmdClass() : null;
   }