You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2012/01/18 04:06:55 UTC

svn commit: r1232728 [1/2] - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce: ./ loadtest/

Author: nspiegelberg
Date: Wed Jan 18 03:06:54 2012
New Revision: 1232728

URL: http://svn.apache.org/viewvc?rev=1232728&view=rev
Log:
[HBASE-4916] New MR HBase load tester with pluggable workloads

Summary:
This is a new load tester for HBase which supports pluggable workloads
and more flexibility in configuring how the load is exercised. This load
tester runs as a native map reduce job. Interfaces which define the
workloads can be extended for specific testing. The intensity of a load
can be controlled by the number of clients, number of threads and
desired operations per second.

Test Plan: Has been run continuously on a dev cluster and on TITANMIGRATE001.

Reviewers: kranganathan, mbautin, liyintang

Reviewed By: liyintang

CC: hbase-eng@lists, erling, mbautin, cgist, liyintang, nspiegelberg

Differential Revision: 372837

Task ID: 741952

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/VersionWorkloadGenerator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Workload.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=1232728&r1=1232727&r2=1232728&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java Wed Jan 18 03:06:54 2012
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hadoop.hbase.mapreduce.loadtest.LoadTest;
 
 /**
  * Driver for hbase mapreduce jobs. Select which to run by passing
@@ -41,6 +42,7 @@ public class Driver {
                  "Complete a bulk data load.");
     pgd.addClass(CopyTable.NAME, CopyTable.class,
         "Export a table from local cluster to peer cluster");
+    pgd.addClass(LoadTest.NAME, LoadTest.class, "Load tester");
     pgd.driver(args);
   }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,96 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An operation generator which is a composite of other operation generators.
+ * Child generators will be assigned weights and will contribute operations with
+ * frequency proportional to their weight.
+ */
+public class CompositeOperationGenerator implements OperationGenerator {
+
+  // Maintain a list of generators where each child is referenced the same
+  // multiple times according to its weight. It is expected that this list will
+  // rarely be updated, making a copy-on-write implementation efficient.
+  private final CopyOnWriteArrayList<OperationGenerator> generators;
+
+  // The sequence number is incremented for each operation generated, and is
+  // used to determine from which child to generate the next operation.
+  private final AtomicLong nextSequence;
+
+  /**
+   * Create a new instance with no child generators.
+   */
+  public CompositeOperationGenerator() {
+    nextSequence = new AtomicLong(0);
+    generators = new CopyOnWriteArrayList<OperationGenerator>();
+  }
+
+  /**
+   * Add a child generator with a certain weight. The added child generator will
+   * be used to generate operations with frequency proportional to its weight
+   * relative to the total weight of all child generators of this instance. The
+   * value of weights are not required to sum to any particular value. Child
+   * weights should be reduced by the lowest common divisor if possible.
+   *
+   * @param generator the child generator to be added
+   * @param weight the relative weight of the child generator
+   */
+  public void addGenerator(OperationGenerator generator, int weight) {
+    if (weight <= 0) {
+      throw new IllegalArgumentException("generator weights must be positive");
+    }
+    List<OperationGenerator> newGenerators =
+        new ArrayList<OperationGenerator>();
+    for (int i = 0; i < weight; i++) {
+      newGenerators.add(generator);
+    }
+    generators.addAll(newGenerators);
+  }
+
+  /**
+   * Remove a child generator from this instance. All child instances equal to
+   * the passed generator will be removed.
+   *
+   * @param generator
+   */
+  public void removeGenerator(OperationGenerator generator) {
+    while (generators.remove(generator));
+  }
+
+  /**
+   * Get the next operation in the combined sequence of operations of this
+   * instances child generators. The returned operation may be null if there are
+   * no children or if the next operation could not be generated. If a child
+   * generator becomes exhausted as a result of calling nextOperation() on it,
+   * then it will be removed from this instance's children.
+   *
+   * @return the next operation to be executed, or null if the next operation
+   *         could not be found
+   * @throws ExhaustedException if the last of this instance's child generators
+   *         has itself become exhausted
+   */
+  public Operation nextOperation(DataGenerator dataGenerator)
+      throws ExhaustedException {
+    if (generators.size() == 0) {
+      throw new ExhaustedException();
+    }
+
+    OperationGenerator next = generators.get(
+        (int)(Math.abs(nextSequence.getAndIncrement() % generators.size())));
+
+    try {
+      return next.nextOperation(dataGenerator);
+    } catch (ExhaustedException e) {
+      removeGenerator(next);
+      return null;
+    } catch (IndexOutOfBoundsException e) {
+      return null;
+    } catch (ArithmeticException e) {
+      return null;
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,126 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
+
+/**
+ * The DataGenerator class and its subclasses define the relationship between
+ * rows and columns and columns and values. These classes are used to construct
+ * the contents to write to a table as well as for verifying the integrity of
+ * the same data when it is read back from a table.
+ */
+public abstract class DataGenerator {
+
+  /**
+   * Get the column qualifiers for a specified row. A row with 0 columns is
+   * allowed.
+   *
+   * @param row
+   * @return the column qualifiers
+   */
+  public abstract byte[][] getColumnQualifiers(byte[] row);
+
+  /**
+   * Get the contents of a single column in a row.
+   *
+   * @param row
+   * @param column
+   * @return the contents of the column
+   */
+  public abstract byte[] getContent(byte[] row, byte[] column);
+
+  /**
+   * Get the contents of all columns in a row.
+   *
+   * @param row
+   * @param columns
+   * @return the contents of all columns
+   */
+  public abstract byte[][] getContents(byte[] row, byte[][] columns);
+
+  /**
+   * Verify that the specified result is consistent, that it has the expected
+   * column names and values for the corresponding key.
+   *
+   * @param result the result from a get or scan operation to be verified
+   * @return true if the result is consistent
+   */
+  public abstract boolean verify(Result result);
+
+  /**
+   * Construct a get operation which will get all of the columns of a specified
+   * key from a specified column family.
+   *
+   * @param key the key within the load-tester key space
+   * @param columnFamily
+   * @return a get operation for the specified key and column family
+   */
+  public abstract Get constructGet(long key, byte[] columnFamily);
+
+  /**
+   * Construct a put for the specified key into the specified column family. The
+   * returned put will contain all of the key values to be inserted in a single
+   * operation. This may return null if the row should have 0 columns.
+   *
+   * @param key the key within the load-tester key space
+   * @param columnFamily
+   * @return a single put containing all of the key values
+   */
+  public abstract Put constructBulkPut(long key, byte[] columnFamily);
+
+  /**
+   * Construct puts for the specified key into the specified column family. Each
+   * key value will be in a distinct put. This may return null if the row should
+   * have 0 columns.
+   *
+   * @param key the key within the load-tester key space
+   * @param columnFamily
+   * @return a list of puts, with each put having a single key value
+   */
+  public abstract List<Put> constructPuts(long key, byte[] columnFamily);
+
+  /**
+   * Construct a row key from a long key. The key will be prefixed by the MD5
+   * hash of the long key to randomize the order of keys.
+   *
+   * @param key the long key which is to be prefixed
+   * @return the prefixed key
+   */
+  public static String md5PrefixedKey(long key) {
+    String stringKey = Long.toString(key);
+    String md5hash = MD5Hash.getMD5AsHex(Bytes.toBytes(stringKey));
+
+    // flip the key to randomize
+    return md5hash + ":" + stringKey;
+  }
+
+  /**
+   * Create a new instance of the specified DataGenerator subclass, given the
+   * specified arguments. The structure of the arguments depends on the
+   * implementation of the DataGenerator subclass.
+   *
+   * @param className the full class name of the DataGenerator to instantiate
+   * @param args the arguments required by the DataGenerator implementation
+   * @return a new instance of the specified DataGenerator class
+   * @throws RuntimeException if for any reason the DataGenerator could not be
+   *         instantiated
+   */
+  public static DataGenerator newInstance(String className, String args) {
+    try {
+      @SuppressWarnings("unchecked")
+      Class<DataGenerator> theClass =
+          (Class<DataGenerator>)Class.forName(className);
+      Constructor<DataGenerator> constructor =
+          theClass.getDeclaredConstructor(String.class);
+      return (DataGenerator)constructor.newInstance(args);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,117 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.client.Get;
+
+/**
+ * Generates get operations for keys randomly chosen from those which have been
+ * marked as successfully completed. Typically this will be used with a
+ * KeyCounter that is shared by a generator of insert operations, to read keys
+ * which are known to have been inserted.
+ */
+public class GetGenerator implements OperationGenerator {
+
+  private KeyCounter keysWritten;
+  private byte[] columnFamily;
+  private Random random;
+  private double verifyFraction;
+  private int maxVersions;
+  private long minTime;
+  private long maxTime;
+  private long timeDelta;
+
+  /**
+   * Construct a generator for get operations which return a single version from
+   * any time range.
+   *
+   * @param columnFamily
+   * @param keysWritten the source of keys
+   * @param verifyFraction the fraction in [0,1] of operations to verify
+   */
+  public GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+      double verifyFraction) {
+    this(columnFamily, keysWritten, verifyFraction, 1, 0, Long.MAX_VALUE);
+  }
+
+  /**
+   * Construct a generator for get operations which should return at most a
+   * specified number of versions, each of which must be at most a specified
+   * age.
+   *
+   * @param columnFamily
+   * @param keysWritten the source of keys
+   * @param verifyFraction the fraction in [0,1] of operations to verify
+   * @param maxVersions the maximum number of versions to return
+   * @param timeDelta the maximum allowed age of a version, in milliseconds
+   */
+  public GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+      double verifyFraction, int maxVersions, long timeDelta) {
+    this(columnFamily, keysWritten, verifyFraction, maxVersions, 0, 0,
+        timeDelta);
+  }
+
+  /**
+   * Construct a generator for get operations which should return at most a
+   * specified number of versions, each of which must fall within a specified
+   * range of absolute timestamps.
+   *
+   * @param columnFamily
+   * @param keysWritten the source of keys
+   * @param verifyFraction the fraction in [0,1] of operations to verify
+   * @param maxVersions the maximum number of versions to return
+   * @param minTime the earliest allowable timestamp on a version
+   * @param maxTime the latest allowable timestamp on a version
+   */
+  public GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+      double verifyFraction, int maxVersions, long minTime, long maxTime) {
+    this(columnFamily, keysWritten, verifyFraction, maxVersions, minTime,
+        maxTime, 0);
+  }
+
+  /**
+   * Private constructor, used by public constructors to set all properties.
+   */
+  private GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+      double verifyFraction, int maxVersions, long minTime, long maxTime,
+      long timeDelta) {
+    this.keysWritten = keysWritten;
+    this.columnFamily = columnFamily;
+    this.random = new Random();
+    this.verifyFraction = verifyFraction;
+    this.maxVersions = maxVersions;
+    this.minTime = minTime;
+    this.maxTime = maxTime;
+    this.timeDelta = timeDelta;
+  }
+
+  /**
+   * Get the next operation. This may return null if no successful key could be
+   * found.
+   *
+   * @return the next get operation for a successful key, or null if none found
+   */
+  public Operation nextOperation(DataGenerator dataGenerator) {
+    try {
+      long key = keysWritten.getRandomKey();
+
+      Get get = dataGenerator.constructGet(key, columnFamily);
+      try {
+        get.setMaxVersions(maxVersions);
+        if (timeDelta > 0) {
+          long currentTime = System.currentTimeMillis();
+          get.setTimeRange(currentTime - timeDelta, currentTime);
+        } else {
+          get.setTimeRange(minTime, maxTime);
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      boolean verify = random.nextDouble() < verifyFraction;
+      return new GetOperation(key, get, verify ? dataGenerator : null);
+    } catch (KeyCounter.NoKeysException e) {
+      return null;
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,59 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * An operation which reads a row from a table. The result of the operation is
+ * held internally and might be verified.
+ */
+public class GetOperation extends Operation{
+
+  private final long key;
+  private final Get get;
+  private Result result;
+  private DataGenerator dataGenerator;
+
+  public Operation.Type getType() {
+    return Operation.Type.GET;
+  }
+
+  /**
+   * Construct a new get operation.
+   *
+   * @param key the key to get
+   * @param get the populated get object
+   * @param dataGenerator the DataGenerator to use to verify the result, or null
+   *        if the result should not be verified
+   */
+  public GetOperation(long key, Get get, DataGenerator dataGenerator) {
+    this.key = key;
+    this.get = get;
+    this.result = null;
+    this.dataGenerator = dataGenerator;
+  }
+
+  public void perform(HTable table) throws IOException {
+    result = table.get(get);
+  }
+
+  public void postAction() {
+    if (dataGenerator != null) {
+      if (!dataGenerator.verify(result)) {
+        StatsCollector.getInstance().getStats(getType()).incrementErrors(1);
+        System.err.println("Verification failed for key " + key);
+      }
+    }
+  }
+
+  public long getNumKeys() {
+    return 1;
+  }
+
+  public long getNumColumns() {
+    return result == null ? 0 : result.size();
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,145 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.Serializable;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A class for tracking successful or failed keys. For keys in the space of
+ * longs, and are traversed in a linear sequence of all consecutive keys,
+ * instances of this class track which keys have been successfully processed or
+ * not and can return a random successful key.
+ *
+ * It is assumed that the number of keys which succeed is orders of magnitude
+ * greater than the number of keys which fail. The memory required by instances
+ * of this class is constant in the number of successful keys (assuming a bound
+ * on the duration for which a key can be in process) and linear in the number
+ * of failed keys.
+ */
+public class KeyCounter implements Serializable {
+
+  private static final long serialVersionUID = 8011474084218253286L;
+
+  // The number of recent successful keys which should be buffered for returning
+  // random successful keys if the standard approach fails.
+  private final static int NUM_RECENT_KEYS = 10000;
+
+  // The first key that is expected to be counted. All internal variables in the
+  // key space are stored after subtracting startKey from them, so that the
+  // internal key space always starts from 0 and is less likely to overflow.
+  private final long startKey;
+
+  // Store all keys that have failed.
+  private final ConcurrentHashMap<Long,Long> failedKeySet;
+
+  // Store some number of recently successful row keys so that there is a bound
+  // on the time it takes to get a random successful key. The map key is in the
+  // range [0, NUM_RECENT_KEYS) and the value is a row key that was recently
+  // successful.
+  private final ConcurrentHashMap<Long,Long> recentSuccessfulKeys;
+
+  // Used for inserting and overwriting recently successful keys in order of
+  // recentness.
+  private final AtomicLong numSuccessfulKeys;
+
+  // The greatest completed key contiguous to the range of completed keys
+  // starting with the startKey.
+  private final AtomicLong lastContiguousKey;
+
+  // A queue of keys which have been completed but are not contiguous to the
+  // completed range of keys starting with the startKey.
+  private final PriorityQueue<Long> keysCompleted;
+
+  // Used to randomly choose a completed successful key.
+  private final Random random;
+
+  /**
+   * Thrown if there are no completed successful keys.
+   */
+  public class NoKeysException extends Exception {
+    private static final long serialVersionUID = -7069323512531319455L;
+  }
+
+  /**
+   * Create a new KeyCounter which starts counting keys from startKey.
+   *
+   * @param startKey the first key expected to be counted
+   */
+  public KeyCounter(long startKey) {
+    this.startKey = startKey;
+    numSuccessfulKeys = new AtomicLong(0);
+    lastContiguousKey = new AtomicLong(-1);
+    keysCompleted = new PriorityQueue<Long>();
+    failedKeySet = new ConcurrentHashMap<Long,Long>();
+    recentSuccessfulKeys = new ConcurrentHashMap<Long, Long>(
+        (int)(1.1 * NUM_RECENT_KEYS), 0.75f, 100);
+    random = new Random();
+  }
+
+  /**
+   * Get a random key which was marked as successful.
+   *
+   * @return a random, successful key
+   * @throws NoKeysException if there have been no successful keys
+   */
+  public long getRandomKey() throws NoKeysException {
+    long last = lastContiguousKey.get();
+    if (last == -1) {
+      throw new NoKeysException();
+    }
+    long key = random.nextInt() << 32 | random.nextInt();
+    if (last == 0) {
+      key = 0;
+    } else {
+      key = Math.abs(key % last);
+    }
+    if (failedKeySet.containsKey(key)) {
+      if (recentSuccessfulKeys.size() > 0) {
+        key = recentSuccessfulKeys.get(
+            Math.abs(key % recentSuccessfulKeys.size()));
+      } else {
+        throw new NoKeysException();
+      }
+    }
+    // Translate to the external key space.
+    return key + startKey;
+  }
+
+  /**
+   * Mark a specified key as processed and either successful or failed.
+   *
+   * @param key the key which has been processed
+   * @param success true if the processing succeeded
+   */
+  public void markKey(long key, boolean success) {
+    // Translate to the internal key space.
+    key -= startKey;
+
+    if (success) {
+      recentSuccessfulKeys.put(
+          numSuccessfulKeys.getAndIncrement() % NUM_RECENT_KEYS, key);
+    } else {
+      failedKeySet.put(key, key);
+    }
+
+    synchronized (keysCompleted) {
+      if (key > lastContiguousKey.get() + 1) {
+        keysCompleted.offer(key);
+      } else if (key == lastContiguousKey.get() + 1) {
+        long last = key;
+        Long next;
+        while ((next = keysCompleted.peek()) != null &&
+            next.longValue() == last + 1) {
+          keysCompleted.poll();
+          last++;
+        }
+        lastContiguousKey.set(last);
+      } else {
+        // This key should already be represented in this counter.
+        return;
+      }
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,299 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class LoadTest extends Configured implements Tool {
+
+  public static final String NAME = "loadtest";
+
+  private static final int DEFAULT_JMX_PORT = 8991;
+
+  private static final String WORKLOADGENERATOR =
+      "hbase.loadtester.workloadgenerator";
+
+  private static final String WORKLOADGENERATOR_ARGS =
+      "hbase.loadtester.generator.args";
+
+  private static final String TABLENAME = "hbase.loadtester.tablename";
+
+  private static final String DEFAULT_TABLENAME = "test1";
+
+  private static final String DATAGENERATOR =
+      "hbase.loadtester.datagenerator";
+
+  private static final String DATAGENERATOR_ARGS =
+      "hbase.loadtester.datagenerator.args";
+
+  private static final String DEFAULT_DATAGENERATOR =
+      RandomDataGenerator.class.getName();
+
+  public static class Map
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    // The mapper should only be executed once. Ensure that each mapper only
+    // sends workloads once by flagging the first pass of each mapper. The
+    // driver will be responsible for ensuring that there is only one mapper.
+    private boolean firstPass;
+
+    public Map() {
+      this.firstPass = true;
+    }
+
+    public void map(LongWritable key, Text value,
+        Mapper<LongWritable, Text, LongWritable, Text>.Context context)
+            throws IOException, InterruptedException {
+      if (firstPass) {
+        // Determine from the job configuration which workloads to generate.
+        String className = context.getConfiguration().get(WORKLOADGENERATOR);
+        Workload.Generator workloadGenerator =
+            Workload.Generator.newInstance(className);
+
+        // Determine from the job configuration the name of the HBase table to
+        // use, and create that table according to the requirements of the
+        // workload generator.
+        String tableName = context.getConfiguration().get(TABLENAME,
+            DEFAULT_TABLENAME);
+        workloadGenerator.setupTable(configurationFromZooKeeper(
+            context.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)),
+            tableName);
+
+        // Determine from the job configuration the arguments for the workload
+        // generator.
+        String generatorArgs =
+            context.getConfiguration().get(WORKLOADGENERATOR_ARGS);
+
+        // Generate the workloads for each of the reducer tasks.
+        List<List<Workload>> workloads =
+            workloadGenerator.generateWorkloads(context.getNumReduceTasks(),
+                generatorArgs);
+
+        // Serialize the workloads assigned to each reducer, and write them out.
+        // Multiple workloads for the same client may share common objects, so
+        // they must be serialized together into a single blob.
+        for (int i = 0; i < workloads.size(); i++) {
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          ObjectOutputStream out = new ObjectOutputStream(baos);
+          out.writeObject(workloads.get(i));
+          out.close();
+
+          // The key is used to partition the workloads to different reducers.
+          context.write(new LongWritable(i), new Text(baos.toByteArray()));
+        }
+        firstPass = false;
+      }
+    }
+  }
+
+  public static class Reduce
+      extends Reducer<LongWritable, Text, LongWritable, Text> {
+
+    public void reduce(LongWritable key, Iterable<Text> values,
+        Reducer<LongWritable, Text, LongWritable, Text>.Context context)
+            throws IOException, InterruptedException {
+      HBaseConfiguration conf = configurationFromZooKeeper(
+          context.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+
+      Operation.setConfAndTableName(conf, context.getConfiguration().get(
+          TABLENAME, DEFAULT_TABLENAME).getBytes());
+
+      DataGenerator dataGenerator = DataGenerator.newInstance(
+          context.getConfiguration().get(DATAGENERATOR, DEFAULT_DATAGENERATOR),
+          context.getConfiguration().get(DATAGENERATOR_ARGS));
+
+      StatsCollector statsCollector = StatsCollector.getInstance();
+
+      List<Workload.Executor> executors = new ArrayList<Workload.Executor>();
+
+      for (Text value : values) {
+        try {
+          // Deserialize the workloads from the input.
+          ObjectInputStream in = new ObjectInputStream(
+              new ByteArrayInputStream(value.getBytes()));
+
+          @SuppressWarnings("unchecked")
+          List<Workload> workloads = (List<Workload>)in.readObject();
+          for (Workload workload : workloads) {
+            // Run each workload in its own executor, with dedicated thread
+            // pools per workload.
+            Workload.Executor executor =
+                new Workload.Executor(workload, dataGenerator);
+            executors.add(executor);
+            executor.start();
+
+            // Initialize the stats for each of the types of operations used
+            // by this workload.
+            statsCollector.initializeTypes(workload.getOperationTypes());
+          }
+        } catch (ClassNotFoundException e) {
+          e.printStackTrace();
+        }
+      }
+
+      // Start the reporters for the initialized types of operations.
+      statsCollector.startMBeanReporters();
+      statsCollector.startConsoleReporters();
+      statsCollector.startMapReduceCounterReporter(context);
+
+      // Wait for the executors to finish. They should never actually finish,
+      // but if the inputs are exhausted then the reducer will terminate and
+      // the job will finish prematurely.
+      for (Workload.Executor executor : executors) {
+        executor.waitForFinish();
+      }
+    }
+  }
+
+  public static class Partition extends Partitioner<LongWritable, Text> {
+    public int getPartition(LongWritable key, Text value, int numPartitions) {
+      // Keys should be uniformly distributed across reducers.
+      return (int)(key.get() % numPartitions);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  public static HBaseConfiguration configurationFromZooKeeper(String zkQuorum) {
+    Configuration conf = new Configuration();
+    conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+    // This constructor is deprecated, but there seems to be no other way to get
+    // a HBaseConfiguration instance, and casting from Configuration fails at
+    // runtime.
+    return new HBaseConfiguration(conf);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = this.getConf();
+
+    String[] otherArgs =
+        new GenericOptionsParser(conf, args).getRemainingArgs();
+    Options options = new Options();
+    options.addOption("zk", true, "ZooKeeper quorum");
+    options.addOption("tasks", true, "Number of client tasks");
+    options.addOption("table", true, "HBase table name");
+    options.addOption("wl", true, "WorkloadGenerator class name");
+    options.addOption("wlargs", true, "WorkloadGenerator arguments");
+    options.addOption("data", true, "DataGenerator class name");
+    options.addOption("dataargs", true, "DataGenerator arguments");
+    options.addOption("jmx", true, "Port number for remote JMX");
+
+    CommandLine cmd = new GnuParser().parse(options, otherArgs);
+    if (cmd.hasOption("zk")) {
+      String quorum = "";
+      for (String node : cmd.getOptionValues("zk")) {
+        quorum += node + ",";
+      }
+      conf.set(HConstants.ZOOKEEPER_QUORUM,
+          quorum.substring(0, quorum.length() - 1));
+    } else {
+      System.err.println("ZooKeeper quorum must be specified");
+      System.exit(1);
+    }
+
+    int numTasks = 1;
+    if (cmd.hasOption("tasks")) {
+      numTasks = Integer.parseInt(cmd.getOptionValue("tasks"));
+      if (numTasks == 0) {
+        System.err.println("The number of client tasks must be positive");
+        System.exit(1);
+      }
+    } else {
+      System.err.println("The number of client tasks must be specified");
+      System.exit(1);
+    }
+
+    if (cmd.hasOption("table")) {
+      conf.set(TABLENAME, cmd.getOptionValue("table"));
+    }
+
+    if (cmd.hasOption("wl")) {
+      conf.set(WORKLOADGENERATOR, cmd.getOptionValue("wl"));
+    } else {
+      System.err.println("No workloads specified");
+      System.exit(1);
+    }
+
+    if (cmd.hasOption("wlargs")) {
+      conf.set(WORKLOADGENERATOR_ARGS, cmd.getOptionValue("wlargs"));
+    }
+
+    if (cmd.hasOption("data")) {
+      conf.set(DATAGENERATOR, cmd.getOptionValue("data"));
+    }
+
+    if (cmd.hasOption("dataargs")) {
+      conf.set(DATAGENERATOR_ARGS, cmd.getOptionValue("dataargs"));
+    }
+
+    int jmxPort = DEFAULT_JMX_PORT;
+    if (cmd.hasOption("jmx")) {
+      jmxPort = Integer.parseInt(cmd.getOptionValue("jmx"));
+    }
+    conf.set("mapred.child.java.opts",
+        "-Dcom.sun.management.jmxremote.authenticate=false" +
+        " -Dcom.sun.management.jmxremote.ssl=false" +
+        " -Dcom.sun.management.jmxremote.port=" + jmxPort);
+
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+    String currentDate = dateFormat.format(new Date());
+
+    // Create a a single input file with a single key-value pair. The contents
+    // of the input file are irrelevant as the mapper does not care. The
+    // important constraint is that there is only one input file. The input and
+    // output paths are date-specific to avoid conflict, though no output is
+    // expected.
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = new Path("/tmp/input/" + currentDate);
+    Path outputPath = new Path("/tmp/output/" + currentDate);
+    OutputStream out = fs.create(inputPath);
+    out.write("0\t0".getBytes());
+    out.close();
+
+    Job job = new Job(conf, "LoadTester_" + currentDate + "_" +
+        Class.forName(conf.get(WORKLOADGENERATOR)).getSimpleName());
+    job.setJarByClass(LoadTest.class);
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setPartitionerClass(Partition.class);
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(numTasks);
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(new LoadTest(), args);
+    System.exit(result);
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,100 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+/**
+ * A generator which mixes insert and get operations in a single threadpool,
+ * such that the ratio of operation types is strictly enforced.
+ */
+public class MixedWorkloadGenerator extends Workload.Generator {
+
+  private static final byte[] columnFamily = "actions".getBytes();
+
+  private int opsPerSecond = Integer.MAX_VALUE;
+  private int numThreads = 20;
+  private int insertWeight = 1;
+  private int getWeight = 1;
+  private double getVerificationFraction = 0.05;
+
+  public List<List<Workload>> generateWorkloads(int numWorkloads, String args) {
+    if (args != null) {
+      String[] splits = args.split(":");
+      if (splits.length != 5) {
+        throw new IllegalArgumentException("Wrong number of argument splits");
+      }
+      opsPerSecond = Integer.parseInt(splits[0]);
+      numThreads = Integer.parseInt(splits[1]);
+      insertWeight = Integer.parseInt(splits[2]);
+      getWeight = Integer.parseInt(splits[3]);
+      getVerificationFraction = Double.parseDouble(splits[4]);
+    }
+
+    List<List<Workload>> workloads = new ArrayList<List<Workload>>(numWorkloads);
+    for (int i = 0; i < numWorkloads; i++) {
+      List<Workload> clientWorkloads = new ArrayList<Workload>();
+      long startKey = Long.MAX_VALUE / numWorkloads * i;
+      clientWorkloads.add(new MixedWorkload(startKey, opsPerSecond, numThreads,
+          insertWeight, getWeight, getVerificationFraction));
+      workloads.add(clientWorkloads);
+    }
+    return workloads;
+  }
+
+  public HTableDescriptor getTableDescriptor() {
+    HTableDescriptor desc = new HTableDescriptor();
+    desc.addFamily(new HColumnDescriptor(columnFamily));
+    return desc;
+  }
+
+  public static class MixedWorkload implements Workload {
+
+    private static final long serialVersionUID = 576338016524909210L;
+    private long startKey;
+    private int opsPerSecond;
+    private int numThreads;
+    private int insertWeight;
+    private int getWeight;
+    private double getVerificationFraction;
+
+    public MixedWorkload(long startKey, int opsPerSecond, int numThreads,
+        int insertWeight, int getWeight, double getVerificationFraction) {
+      this.startKey = startKey;
+      this.opsPerSecond = opsPerSecond;
+      this.numThreads = numThreads;
+      this.insertWeight = insertWeight;
+      this.getWeight = getWeight;
+      this.getVerificationFraction = getVerificationFraction;
+    }
+
+    public OperationGenerator constructGenerator() {
+      KeyCounter keysWritten = new KeyCounter(startKey);
+      PutGenerator insertGenerator =
+          new PutGenerator(columnFamily, keysWritten, startKey, true);
+      GetGenerator getGenerator =
+          new GetGenerator(columnFamily, keysWritten, getVerificationFraction);
+
+      CompositeOperationGenerator compositeGenerator =
+          new CompositeOperationGenerator();
+      compositeGenerator.addGenerator(insertGenerator, insertWeight);
+      compositeGenerator.addGenerator(getGenerator, getWeight);
+      return compositeGenerator;
+    }
+
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    public int getOpsPerSecond() {
+      return opsPerSecond;
+    }
+
+    public EnumSet<Operation.Type> getOperationTypes() {
+      return EnumSet.of(Operation.Type.BULK_PUT, Operation.Type.GET);
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,129 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.loadtest.StatsCollector.Stats;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ * Represents an action that can be performed against an HBase HTable. The HBase
+ * configuration and table are defined statically. The Operation class maintains
+ * thread-local references to HTables, allowing multiple concurrent operations
+ * to be performed against the same table. Subclasses of Operation are not
+ * required to be thread-safe, and should not be accessed from multiple threads
+ * concurrently.
+ */
+public abstract class Operation implements Runnable {
+
+  private static volatile HBaseConfiguration conf;
+  private static volatile byte[] tableName;
+
+  /**
+   * HTable instances are not thread-safe, so they need to be encapsulated in a
+   * ThreadLocal variable.
+   */
+  private static final ThreadLocal<HTable> table = new ThreadLocal<HTable>() {
+    @Override
+    protected HTable initialValue() {
+      try {
+        return new HTable(conf, tableName);
+      } catch (IOException e) {
+        e.printStackTrace();
+        return null;
+      }
+    }
+  };
+
+  /**
+   * Initialize the HBase configuration and table name against which all
+   * operations will be performed. This must be done before any operation is
+   * executed.
+   *
+   * @param conf the HBase configuration to use
+   * @param tableName the name of the HBase table against which to operate
+   */
+  public static void setConfAndTableName(
+      HBaseConfiguration conf, byte[] tableName) {
+    Operation.conf = conf;
+    Operation.tableName = tableName;
+  }
+
+  /**
+   * The type of operation, used for reporting and statistics.
+   */
+  public enum Type {
+    GET("get", "R"),
+    BULK_PUT("bulk-put", "W"),
+    PUT("put", "W");
+
+    public final String name;
+    public final String shortName;
+
+    private Type(String name, String shortName) {
+      this.name = name;
+      this.shortName = shortName;
+    }
+  };
+
+  /**
+   * Get the type of the operation represented by the subclass.
+   * @return the type of the operation
+   */
+  public abstract Type getType();
+
+  /**
+   * Perform this operation against a specific HBase table.
+   *
+   * @param table HBase table against which to perform the operation
+   * @throws IOException
+   */
+  public abstract void perform(HTable table) throws IOException;
+
+  /**
+   * Do any cleanup or verification required after the operation is performed.
+   * This will be called after the operation is performed, regardless of whether
+   * or not the operation was successful or threw an exception.
+   */
+  public abstract void postAction();
+
+  /**
+   * Get the number of keys affected by this operation. For some operations,
+   * which may be part of a group of operations acting on a single key, this
+   * method may return 0 for all but the last part to complete.
+   *
+   * @return the number of keys affected by this operation
+   */
+  public abstract long getNumKeys();
+
+  /**
+   * Get the number of columns affected by this operation.
+   *
+   * @return the number of columns affected by this operation
+   */
+  public abstract long getNumColumns();
+
+  /**
+   * Performs this operation against the table.
+   */
+  public void run() {
+    StatsCollector.Stats stats =
+        StatsCollector.getInstance().getStats(this.getType());
+    long start = System.currentTimeMillis();
+
+    try {
+      perform(table.get());
+    } catch (IOException e) {
+      stats.incrementFailures(1);
+    } catch (Exception e) {
+      e.printStackTrace();
+      stats.incrementFailures(1);
+    } finally {
+      long elapsed = System.currentTimeMillis() - start;
+      stats.incrementCumulativeOpTime(elapsed);
+      stats.incrementKeys(getNumKeys());
+      stats.incrementColumns(getNumColumns());
+      postAction();
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,36 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+/**
+ * OperationGenerators provide a sequence of operations according to some rules
+ * or patterns defined by the implementation.
+ */
+public interface OperationGenerator {
+
+  /**
+   * Get the next operation to be executed. This method may occasionally return
+   * null if there was a transient issue in generating the operation. Returning
+   * null is not an error and should simply be ignored. This allows other
+   * OperationGenerator instances an opportunity to return operations instead of
+   * waiting for any particular instance to return its next value operation.
+   *
+   * Implementations may be intended to only provide a certain number of
+   * operations. Those implementations will thereafter throw ExhaustedExceptions
+   * when nextOperation() is invoked on them. Any instance which has thrown an
+   * ExhaustedException should no longer be asked for more operations, unless
+   * implementation-specific measures are used to reset that instance.
+   *
+   * @param dataGenerator DataGenerator used to construct the next operation
+   * @return the next operation, or null
+   * @throws ExhaustedException if the
+   */
+  public Operation nextOperation(DataGenerator dataGenerator)
+      throws ExhaustedException;
+
+  /**
+   * Thrown when an instance has run out of operations to return.
+   */
+  public static class ExhaustedException extends Exception {
+    private static final long serialVersionUID = -5647515931122719787L;
+  }
+
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,104 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Generates put operations, either bulk inserts or single key-value inserts.
+ * Operations are generated for consecutive increasing keys within the load
+ * tester key space, starting from a specified key.
+ */
+public class PutGenerator implements OperationGenerator {
+
+  protected final AtomicLong nextKey;
+  protected final byte[] columnFamily;
+  protected final KeyCounter keysWritten;
+
+  // If queue is null, then bulk inserts will be generated, otherwise, inserts
+  // of single KeyValues will be generated.
+  private final LinkedBlockingQueue<PutOperation> queue;
+
+  /**
+   * Default constructor.
+   *
+   * @param columnFamily
+   * @param keysWritten the sink to be populated with written keys
+   * @param startKey the first key to insert
+   * @param bulkInsert if true, operations will each insert multiple key-values
+   */
+  public PutGenerator(byte[] columnFamily, KeyCounter keysWritten,
+      long startKey, boolean bulkInsert) {
+    nextKey = new AtomicLong(startKey);
+    this.columnFamily = columnFamily;
+    this.keysWritten = keysWritten;
+    if (bulkInsert) {
+      this.queue = null;
+    } else {
+      this.queue = new LinkedBlockingQueue<PutOperation>();
+    }
+  }
+
+  public Operation nextOperation(DataGenerator dataGenerator) {
+    // Loop until we get a put operation or an error returns null.
+    while (true) {
+      // Check if there are queued operations which were parts of previous puts.
+      if (queue != null) {
+        PutOperation head = queue.poll();
+        if (head != null) {
+          return head;
+        }
+      }
+
+      try {
+        long key = getNextKey();
+        PutOperation put;
+        if (queue != null) {
+          put = getPut(dataGenerator, key);
+        } else {
+          put = getBulkPut(dataGenerator, key);
+        }
+        if (put != null) {
+          return put;
+        }
+        // The next key was not supposed to be written, so try again.
+      } catch (KeyCounter.NoKeysException e) {
+        // There were no keys to be written, do not try again.
+        return null;
+      }
+    }
+  }
+
+  protected long getNextKey() throws KeyCounter.NoKeysException {
+    return nextKey.getAndIncrement();
+  }
+
+  protected PutOperation getBulkPut(DataGenerator dataGenerator, long key) {
+    Put put = dataGenerator.constructBulkPut(key, columnFamily);
+    if (put != null) {
+      return new PutOperation(key, put, keysWritten);
+    } else {
+      // Key was defined to be skipped, mark it as complete so it can be read.
+      keysWritten.markKey(key, true);
+      return null;
+    }
+  }
+
+  protected PutOperation getPut(DataGenerator dataGenerator, long key) {
+    List<Put> puts = dataGenerator.constructPuts(key, columnFamily);
+    if (puts != null) {
+      AtomicInteger remainingParts = new AtomicInteger(puts.size());
+      for (int i = 1; i < puts.size(); i++) {
+        queue.offer(new PutOperation(key, puts.get(i), keysWritten,
+            remainingParts));
+      }
+      return new PutOperation(key, puts.get(0), keysWritten, remainingParts);
+    } else {
+      keysWritten.markKey(key, true);
+      return null;
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,90 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * An operation that inserts one or more key-values.
+ */
+public class PutOperation extends Operation {
+
+  private final Put put;
+  private final long key;
+  private boolean success;
+  private final KeyCounter writtenKeys;
+
+  // Multi-part operations need to coordinate between themselves so that only
+  // one of the parts makes certain updates. Related parts share an atomic
+  // counter, allowing them to determine which part was last to complete.
+  private final AtomicInteger partsRemaining;
+  private boolean wasLast;
+
+  /**
+   * Create a single-part (bulk insert) put operation.
+   *
+   * @param key the load-tester key space key being inserted
+   * @param put
+   * @param writtenKeys the sink of keys to be updated after execution
+   */
+  public PutOperation(long key, Put put, KeyCounter writtenKeys) {
+    this(key, put, writtenKeys, null);
+  }
+
+  /**
+   * Create a multi-part (non-bulk insert) put operation.
+   *
+   * @param key
+   * @param put
+   * @param partsRemaining shared counter of parts remaining to execute
+   * @param writtenKeys the sink of keys to be updated after execution
+   */
+  public PutOperation(long key, Put put, KeyCounter writtenKeys,
+      AtomicInteger partsRemaining) {
+    this.put = put;
+    this.key = key;
+    this.writtenKeys = writtenKeys;
+    this.success = false;
+
+    this.partsRemaining = partsRemaining;
+    this.wasLast = false;
+  }
+
+  public Operation.Type getType() {
+    return (partsRemaining == null)
+        ? Operation.Type.BULK_PUT
+        : Operation.Type.PUT;
+  }
+
+  public void perform(HTable table) throws IOException {
+    table.put(put);
+    success = true;
+    if (partsRemaining != null) {
+      // If this was not a bulk insert, determine if it was the last part.
+      wasLast = (partsRemaining.decrementAndGet() == 0);
+    }
+  }
+
+  /**
+   * Mark the key as completed so it can be used in get requests.
+   */
+  public void postAction() {
+    if (partsRemaining == null || wasLast) {
+      // Only mark the key complete if it was a bulk insert or the last part of
+      // several related inserts for the same key.
+      writtenKeys.markKey(key, success);
+    }
+  }
+
+  public long getNumKeys() {
+    // Return 1 only if this was the only (bulk) insert or if it was the last of
+    // several related inserts for the same key.
+    return (partsRemaining == null || wasLast) ? 1 : 0;
+  }
+
+  public long getNumColumns() {
+    return put.size();
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,30 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+/**
+ * Generates bulk puts for random keys which have already been successfully
+ * processed by something else. This is typically used for overwriting keys
+ * which were previously inserted by another generator, though this is not a
+ * requirement.
+ */
+public class PutReGenerator extends PutGenerator {
+
+  /**
+   * Default constructor.
+   *
+   * @param columnFamily
+   * @param keysWritten the source of keys to write and sink of keys written
+   * @param bulkInsert if true, operations will each insert multiple key-values
+   */
+  public PutReGenerator(byte[] columnFamily, KeyCounter keysWritten,
+      boolean bulkInsert) {
+    super(columnFamily, keysWritten, 0, bulkInsert);
+  }
+
+  /**
+   * Override superclass behavior to get a random key instead of the next
+   * sequential key.
+   */
+  protected long getNextKey() throws KeyCounter.NoKeysException {
+    return keysWritten.getRandomKey();
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,172 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A DataGenerator that pseudo-randomly chooses the number, qualifiers and
+ * contents of columns. PRNGs are seeded in a repeatable way per row key.
+ */
+public class RandomDataGenerator extends DataGenerator {
+
+  private static final int MIN_COLUMNS = 1;
+  private static final int MAX_COLUMNS = 65535;
+
+  private static final double DEFAULT_NUM_COLUMNS_MEAN = 10;
+  private static final double DEFAULT_QUALIFIER_SIZE_MEAN = 16;
+  private static final double DEFAULT_COLUMN_SIZE_MEAN = 512;
+
+  private final double numColumnsMean;
+  private final double qualifierSizeMean;
+  private final double columnSizeMean;
+
+  /**
+   * Constructs a new RandomDataGenerator, parsing properties from the passed
+   * arguments if any, using defaults otherwise.
+   *
+   * @param args
+   */
+  public RandomDataGenerator(String args) {
+    if (args != null) {
+      String[] splits = args.split(":");
+      if (splits.length != 3) {
+        throw new IllegalArgumentException("wrong number of parameters");
+      }
+      numColumnsMean = Double.parseDouble(splits[0]);
+      qualifierSizeMean = Double.parseDouble(splits[1]);
+      columnSizeMean = Double.parseDouble(splits[2]);
+    } else {
+      numColumnsMean = DEFAULT_NUM_COLUMNS_MEAN;
+      qualifierSizeMean = DEFAULT_QUALIFIER_SIZE_MEAN;
+      columnSizeMean = DEFAULT_COLUMN_SIZE_MEAN;
+    }
+  }
+
+  /**
+   * Choose a random number of columns between MIN_COLUMNS and MAX_COLUMNS (both
+   * inclusive) with a gaussian distribution around numColumnsMean. The length
+   * of each column qualifier is chosen randomly with a gaussian distribution.
+   * The first two bytes of each column qualifier are the index of that column
+   * within the row (in big endian order) and the rest of the qualifier is
+   * chosen randomly.
+   */
+  public byte[][] getColumnQualifiers(byte[] row) {
+    Random random = new Random(new String(row).hashCode());
+    int numCol = (int) Math.min(Math.max(MIN_COLUMNS,
+        Math.round(random.nextGaussian() + numColumnsMean)), MAX_COLUMNS);
+    byte[][] qualifiers = new byte[numCol][];
+
+    for (int i = 0; i < numCol; i++) {
+      int qualifierLength = (int) Math.max(2,
+          Math.round(random.nextGaussian() + qualifierSizeMean));
+      qualifiers[i] = new byte[qualifierLength];
+      random.nextBytes(qualifiers[i]);
+      qualifiers[i][0] = (byte)((i >>> 8) & 0xff);
+      qualifiers[i][1] = (byte)(i & 0xff);
+    }
+    return qualifiers;
+  }
+
+  /**
+   * Column content length and value are chosen randomly, seeded by the row key.
+   */
+  public byte[] getContent(byte[] row, byte[] column) {
+    Random random = new Random(new String(column).hashCode());
+    int contentLength =
+        (int) Math.max(1, random.nextGaussian() + columnSizeMean);
+    byte[] content = new byte[contentLength];
+    random.nextBytes(content);
+    return content;
+  }
+
+  /**
+   * Column content length and value are chosen randomly, seeded by the row key.
+   */
+  public byte[][] getContents(byte[] row, byte[][] columns) {
+    byte[][] contents = new byte[columns.length][];
+    Random random = new Random();
+    for (int i = 0; i < columns.length; i++) {
+      random.setSeed(new String(columns[i]).hashCode());
+      int contentLength =
+          (int)Math.max(1, random.nextGaussian() + columnSizeMean);
+      contents[i] = new byte[contentLength];
+      random.nextBytes(contents[i]);
+    }
+    return contents;
+  }
+
+  public boolean verify(Result result) {
+    if (result.isEmpty()) {
+      return false;
+    }
+    byte[] row = result.getRow();
+    byte[][] columns = getColumnQualifiers(row);
+    byte[][] contents = getContents(row, columns);
+    boolean[] verifiedColumns = new boolean[columns.length];
+
+    for (KeyValue kv : result.list()) {
+      byte[] qualifier = kv.getQualifier();
+      int index = qualifier[0] << 8 | qualifier[1];
+      if (!Arrays.equals(kv.getQualifier(), columns[index])) {
+        // The column qualifier did not match the expected qualifier.
+        return false;
+      } else {
+        if (!Arrays.equals(kv.getValue(), contents[index])) {
+          return false;
+        }
+        verifiedColumns[index] = true;
+      }
+    }
+    // Check that all columns were present in the result.
+    for (boolean verifiedColumn : verifiedColumns) {
+      if (!verifiedColumn) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public Get constructGet(long key, byte[] columnFamily) {
+    Get get = new Get(DataGenerator.md5PrefixedKey(key).getBytes());
+    get.addFamily(columnFamily);
+    return get;
+  }
+
+  public Put constructBulkPut(long key, byte[] columnFamily) {
+    byte[] row = DataGenerator.md5PrefixedKey(key).getBytes();
+    byte[][] qualifiers = getColumnQualifiers(row);
+    if (qualifiers.length == 0) {
+      return null;
+    }
+    byte[][] contents = getContents(row, qualifiers);
+    Put put = new Put(row);
+    for (int i = 0; i < qualifiers.length; i++) {
+      put.add(columnFamily, qualifiers[i], contents[i]);
+    }
+    return put;
+  }
+
+  public List<Put> constructPuts(long key, byte[] columnFamily) {
+    byte[] row = DataGenerator.md5PrefixedKey(key).getBytes();
+    byte[][] qualifiers = getColumnQualifiers(row);
+    if (qualifiers.length == 0) {
+      return null;
+    }
+    byte[][] contents = getContents(row, qualifiers);
+    List<Put> puts = new ArrayList<Put>(contents.length);
+    for (int i = 0; i < qualifiers.length; i++) {
+      Put put = new Put(row);
+      put.add(columnFamily, qualifiers[i], contents[i]);
+      puts.add(put);
+    }
+    return puts;
+  }
+
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,125 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+/**
+ * A workload that has separate threadpools for insert operations and get
+ * operations on each client.
+ */
+public class SeparateWorkloadGenerator extends Workload.Generator {
+
+  private static final byte[] columnFamily = "actions".getBytes();
+
+  private int insertThreads = 20;
+  private int insertOpsPerSecond = insertThreads * 1000000;
+  private int getThreads = 20;
+  private int getOpsPerSecond = getThreads * 1000000;
+  private double getVerificationFraction = 0.05;
+
+  public List<List<Workload>> generateWorkloads(int numWorkloads, String args) {
+
+    if (args != null) {
+      String[] splits = args.split(":");
+      if (splits.length != 5) {
+        throw new IllegalArgumentException("Wrong number of argument splits");
+      }
+      insertOpsPerSecond = Integer.parseInt(splits[0]);
+      insertThreads = Integer.parseInt(splits[1]);
+      getOpsPerSecond = Integer.parseInt(splits[2]);
+      getThreads = Integer.parseInt(splits[3]);
+      getVerificationFraction = Double.parseDouble(splits[4]);
+    }
+
+    List<List<Workload>> workloads =
+        new ArrayList<List<Workload>>(numWorkloads);
+    for (int i = 0; i < numWorkloads; i++) {
+      List<Workload> clientWorkloads = new ArrayList<Workload>();
+      long startKey = Long.MAX_VALUE / numWorkloads * i;
+      KeyCounter keysWritten = new KeyCounter(startKey);
+      clientWorkloads.add(new GetWorkload(getOpsPerSecond, getThreads,
+          getVerificationFraction, keysWritten));
+      clientWorkloads.add(new InsertWorkload(startKey, insertOpsPerSecond,
+          insertThreads, keysWritten));
+      workloads.add(clientWorkloads);
+    }
+    return workloads;
+  }
+
+  public HTableDescriptor getTableDescriptor() {
+    HTableDescriptor desc = new HTableDescriptor();
+    desc.addFamily(new HColumnDescriptor(columnFamily));
+    return desc;
+  }
+
+  public static class GetWorkload implements Workload {
+
+    private static final long serialVersionUID = 4077118754127556529L;
+    private int opsPerSecond;
+    private int numThreads;
+    private double getVerificationFraction;
+    private KeyCounter keysWritten;
+
+    public GetWorkload(int opsPerSecond, int numThreads,
+        double getVerificationFraction, KeyCounter keysWritten) {
+      this.opsPerSecond = opsPerSecond;
+      this.numThreads = numThreads;
+      this.getVerificationFraction = getVerificationFraction;
+      this.keysWritten = keysWritten;
+    }
+
+    public OperationGenerator constructGenerator() {
+      return new GetGenerator(columnFamily, keysWritten,
+          getVerificationFraction);
+    }
+
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    public int getOpsPerSecond() {
+      return opsPerSecond;
+    }
+
+    public EnumSet<Operation.Type> getOperationTypes() {
+      return EnumSet.of(Operation.Type.GET);
+    }
+  }
+
+  public static class InsertWorkload implements Workload {
+
+    private static final long serialVersionUID = -6717959795026317422L;
+    private long startKey;
+    private int opsPerSecond;
+    private int numThreads;
+    private KeyCounter keysWritten;
+
+    public InsertWorkload(long startKey, int opsPerSecond, int numThreads,
+        KeyCounter keysWritten) {
+      this.startKey = startKey;
+      this.opsPerSecond = opsPerSecond;
+      this.numThreads = numThreads;
+      this.keysWritten = keysWritten;
+    }
+
+    public OperationGenerator constructGenerator() {
+      return new PutGenerator(columnFamily, keysWritten, startKey, true);
+    }
+
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    public int getOpsPerSecond() {
+      return opsPerSecond;
+    }
+
+    public EnumSet<Operation.Type> getOperationTypes() {
+      return EnumSet.of(Operation.Type.BULK_PUT);
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,476 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.lang.management.ManagementFactory;
+import java.util.EnumSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Collects statistics on the performance of operations. StatsCollector is a
+ * singleton which manages all of the statistics for all operations within a
+ * client. The StatsCollector also provides reporters which make the collected
+ * statistics available through multiple outputs.
+ */
+public class StatsCollector {
+
+  private static final int CONSOLE_REPORTER_PERIOD_MS = 5000;
+  private static final int MRCOUNTER_REPORTER_PERIOD_MS = 5000;
+
+  private ConcurrentHashMap<Operation.Type, Stats> map;
+  private static final StatsCollector instance = new StatsCollector();
+
+  /**
+   * Get the singleton instance.
+   *
+   * @return the singleton instance
+   */
+  public static StatsCollector getInstance() {
+    return instance;
+  }
+
+  /**
+   * Private constructor to enforce singleton access.
+   */
+  private StatsCollector() {
+    this.map = new ConcurrentHashMap<Operation.Type, Stats>();
+  }
+
+  /**
+   * Get the Stats object for a certain type of operation.
+   *
+   * @param type
+   * @return the Stats object for the specified operation type
+   */
+  public Stats getStats(Operation.Type type) {
+    Stats stats = map.get(type);
+    if (stats == null) {
+      synchronized (map) {
+        // Double-check-locking is safe here because map is thread-safe.
+        if (map.get(type) == null) {
+          stats = new Stats(type);
+          map.put(type, stats);
+        }
+      }
+    }
+    return stats;
+  }
+
+  /**
+   * Ensure that Stats objects for the specified operation types are initialized
+   * and will be available to any reporters started after this call.
+   *
+   * @param types the set of operation types to initialize
+   */
+  public void initializeTypes(EnumSet<Operation.Type> types) {
+    for (Operation.Type type : types) {
+      getStats(type);
+    }
+  }
+
+  /**
+   * Start the MBean reporters for each of the initialized types of Stats. This
+   * should only be called once, and only after all Stats have been initialized.
+   */
+  public void startMBeanReporters() {
+    for (Stats stats : map.values()) {
+      new StatsMBeanReporter(stats);
+    }
+  }
+
+  /**
+   * Start a console reporter for each of the initialized types of Stats. This
+   * should only be called once, and only after all Stats have been initialized.
+   */
+  public void startConsoleReporters() {
+    for (Stats stats : map.values()) {
+      new StatsConsoleReporter(stats);
+    }
+  }
+
+  /**
+   * Start a mapreduce counter reporter for each of the initialized types of
+   * Stats. This should only be called once, and only after all Stats have been
+   * initialized.
+   *
+   * @param context the mapreduce context of which to update the counters
+   */
+  public void startMapReduceCounterReporter(
+      @SuppressWarnings("rawtypes") final TaskInputOutputContext context) {
+    for (Stats stats : map.values()) {
+      new StatsMapReduceCounterReporter(stats, context);
+    }
+  }
+
+  /**
+   * Represents the statistics collected about a particular type of operation.
+   */
+  public static class Stats {
+
+    private final Operation.Type type;
+
+    private final AtomicLong numKeys;
+    private final AtomicLong numKeysVerified;
+    private final AtomicLong numColumns;
+    private final AtomicLong numErrors;
+    private final AtomicLong numFailures;
+    private final AtomicLong cumulativeOpTime;
+
+    private final long startTime;
+
+    public Stats(Operation.Type type) {
+      this.type = type;
+
+      this.numKeys = new AtomicLong(0);
+      this.numKeysVerified = new AtomicLong(0);
+      this.numColumns = new AtomicLong(0);
+      this.numErrors = new AtomicLong(0);
+      this.numFailures = new AtomicLong(0);
+      this.cumulativeOpTime = new AtomicLong(0);
+
+      this.startTime = System.currentTimeMillis();
+    }
+
+    public Operation.Type getType() {
+      return this.type;
+    }
+
+    public long getNumKeys() {
+      return numKeys.get();
+    }
+
+    public void incrementKeys(long delta) {
+      numKeys.addAndGet(delta);
+    }
+
+    public long getNumKeysVerified() {
+      return numKeysVerified.get();
+    }
+
+    public void incrementKeysVerified(long delta) {
+      numKeysVerified.addAndGet(delta);
+    }
+
+    public long getNumColumns() {
+      return numColumns.get();
+    }
+
+    public void incrementColumns(long delta) {
+      numColumns.addAndGet(delta);
+    }
+
+    public long getNumErrors() {
+      return numErrors.get();
+    }
+
+    public void incrementErrors(long delta) {
+      numErrors.addAndGet(delta);
+    }
+
+    public long getNumFailures() {
+      return numFailures.get();
+    }
+
+    public void incrementFailures(long delta) {
+      numFailures.addAndGet(delta);
+    }
+
+    public long getCumulativeOpTime() {
+      return cumulativeOpTime.get();
+    }
+
+    public void incrementCumulativeOpTime(long delta) {
+      cumulativeOpTime.addAndGet(delta);
+    }
+
+    public long getStartTime() {
+      return startTime;
+    }
+  }
+
+  /**
+   * A snapshot of an operation's stats at a particular time, to facilitate
+   * reporting of the rate of change of certain statistics.
+   */
+  public static class StatsSnapshot {
+
+    protected final Stats stats;
+
+    protected final AtomicLong priorKeysPerSecondCumulativeKeys;
+    protected final AtomicLong priorKeysPerSecondTime;
+    protected final AtomicLong priorColumnsPerSecondCumulativeColumns;
+    protected final AtomicLong priorColumnsPerSecondTime;
+    protected final AtomicLong priorLatencyCumulativeKeys;
+    protected final AtomicLong priorLatencyCumulativeLatency;
+
+    public StatsSnapshot(Stats stats) {
+      long now = System.currentTimeMillis();
+      this.stats = stats;
+      this.priorKeysPerSecondCumulativeKeys = new AtomicLong(0);
+      this.priorKeysPerSecondTime = new AtomicLong(now);
+      this.priorColumnsPerSecondCumulativeColumns = new AtomicLong(0);
+      this.priorColumnsPerSecondTime = new AtomicLong(now);
+      this.priorLatencyCumulativeKeys = new AtomicLong(0);
+      this.priorLatencyCumulativeLatency = new AtomicLong(0);
+    }
+
+    public synchronized long getKeysPerSecond() {
+      long currentTime = System.currentTimeMillis();
+      long priorTime = priorKeysPerSecondTime.getAndSet(currentTime);
+      long currentKeys = stats.getNumKeys();
+      long priorKeys = priorKeysPerSecondCumulativeKeys.getAndSet(currentKeys);
+      long timeDelta = currentTime - priorTime;
+      if (timeDelta == 0) {
+        return 0;
+      }
+      return 1000 * (currentKeys - priorKeys) / timeDelta;
+    }
+
+    public synchronized long getColumnsPerSecond() {
+      long currentTime = System.currentTimeMillis();
+      long priorTime = priorColumnsPerSecondTime.getAndSet(currentTime);
+      long currentColumns = stats.getNumColumns();
+      long priorColumns = priorColumnsPerSecondCumulativeColumns
+          .getAndSet(currentColumns);
+      long timeDelta = currentTime - priorTime;
+      if (timeDelta == 0) {
+        return 0;
+      }
+      return 1000 * (currentColumns - priorColumns) / timeDelta;
+    }
+
+    public synchronized long getAverageLatency() {
+      long currentLatency = stats.getCumulativeOpTime();
+      long priorLatency = priorLatencyCumulativeLatency
+          .getAndSet(currentLatency);
+      long currentKeys = stats.getNumKeys();
+      long priorKeys = priorLatencyCumulativeKeys.getAndSet(currentKeys);
+      long keyDelta = currentKeys - priorKeys;
+      if (keyDelta == 0) {
+        return 0;
+      }
+      return (currentLatency - priorLatency) / keyDelta;
+    }
+
+    public long getCumulativeKeysPerSecond() {
+      long timeDelta = System.currentTimeMillis() - stats.getStartTime();
+      if (timeDelta == 0) {
+        return 0;
+      }
+      return 1000 * stats.getNumKeys() / timeDelta;
+    }
+
+    public long getCumulativeKeys() {
+      return stats.getNumKeys();
+    }
+
+    public long getCumulativeColumns() {
+      return stats.getNumColumns();
+    }
+
+    public long getCumulativeAverageLatency() {
+      if (stats.getNumKeys() == 0) {
+        return 0;
+      }
+      return stats.getCumulativeOpTime() / stats.getNumKeys();
+    }
+
+    public long getCumulativeErrors() {
+      return stats.getNumErrors();
+    }
+
+    public long getCumulativeOpFailures() {
+      return stats.getNumFailures();
+    }
+
+    public long getCumulativeKeysVerified() {
+      return stats.getNumKeysVerified();
+    }
+  }
+
+  /**
+   * Periodically reports an operation's stats to the standard output console.
+   */
+  private static class StatsConsoleReporter extends StatsSnapshot {
+
+    public static String formatTime(long elapsedTime) {
+      String format = String.format("%%0%dd", 2);
+      elapsedTime = elapsedTime / 1000;
+      String seconds = String.format(format, elapsedTime % 60);
+      String minutes = String.format(format, (elapsedTime % 3600) / 60);
+      String hours = String.format(format, elapsedTime / 3600);
+      String time =  hours + ":" + minutes + ":" + seconds;
+      return time;
+    }
+
+    public StatsConsoleReporter(final Stats stats) {
+      super(stats);
+      final long start = System.currentTimeMillis();
+
+      new Thread(new Runnable() {
+        public void run() {
+          while (true) {
+            System.out.println(formatTime(System.currentTimeMillis() - start) +
+                " [" + stats.getType().name + "] " +
+                "keys/s: [current: " + getKeysPerSecond() + " average: " +
+                getCumulativeKeysPerSecond() +"] " +
+                "latency: [current: " + getAverageLatency() + " average: " +
+                getCumulativeAverageLatency() + "] " +
+                "errors: " + getCumulativeErrors() +
+                " failures: " + getCumulativeOpFailures());
+            try {
+              Thread.sleep(CONSOLE_REPORTER_PERIOD_MS);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      }).start();
+    }
+  }
+
+  /**
+   * Periodically reports an operation's stats to mapreduce counters. Only stats
+   * which are additive between clients will be reported.
+   */
+  private static class StatsMapReduceCounterReporter extends StatsSnapshot {
+    public StatsMapReduceCounterReporter(final Stats stats,
+        @SuppressWarnings("rawtypes") final TaskInputOutputContext context) {
+      super(stats);
+
+      new Thread(new Runnable() {
+        public void run() {
+          String typeName = stats.getType().name;
+          long oldCumulativeKeysPerSecond = 0;
+          long oldCurrentKeysPerSecond = 0;
+          long oldCumulativeKeys = 0;
+          long oldCumulativeFailures = 0;
+          long oldCumulativeErrors = 0;
+
+          while (true) {
+            long newCumulativeKeysPerSecond = getCumulativeKeysPerSecond();
+            context.getCounter("cumulativeKeysPerSecond", typeName).increment(
+                newCumulativeKeysPerSecond - oldCumulativeKeysPerSecond);
+            oldCumulativeKeysPerSecond = newCumulativeKeysPerSecond;
+
+            long newCurrentKeysPerSecond = getKeysPerSecond();
+            context.getCounter("currentKeysPerSecond", typeName).increment(
+                newCurrentKeysPerSecond - oldCurrentKeysPerSecond);
+            oldCurrentKeysPerSecond = newCurrentKeysPerSecond;
+
+            long newCumulativeKeys = stats.getNumKeys();
+            context.getCounter("cumulativeKeys", typeName).increment(
+                newCumulativeKeys - oldCumulativeKeys);
+            oldCumulativeKeys = newCumulativeKeys;
+
+            long newCumulativeFailures = getCumulativeOpFailures();
+            context.getCounter("cumulativeFailures", typeName).increment(
+                newCumulativeFailures - oldCumulativeFailures);
+            oldCumulativeFailures = newCumulativeFailures;
+
+            long newCumulativeErrors = getCumulativeErrors();
+            context.getCounter("cumulativeErrors", typeName).increment(
+                newCumulativeErrors - oldCumulativeErrors);
+            oldCumulativeErrors = newCumulativeErrors;
+
+            context.progress();
+
+            try {
+              Thread.sleep(MRCOUNTER_REPORTER_PERIOD_MS);
+            } catch(InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      }).start();
+    }
+  }
+
+  /**
+   * Make an operation's stats available for query via an MBean. Any rate-based
+   * stats will be calculated for the time between successive invocations of the
+   * MBean interface.
+   */
+  private static class StatsMBeanReporter extends StatsSnapshot
+      implements StatsMBeanReporterMBean {
+
+    public StatsMBeanReporter(Stats stats) {
+      super(stats);
+
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      try {
+        ObjectName name = new ObjectName("LoadTester:name=" +
+            stats.getType().shortName);
+        mbs.registerMBean(this, name);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public static interface StatsMBeanReporterMBean {
+    /**
+     * @return the average number of keys processed per second since the
+     *         previous invocation of this method
+     */
+    public long getKeysPerSecond();
+
+    /**
+     * @return the average number of columns processed per second since the
+     *         previous invocation of this method
+     */
+    public long getColumnsPerSecond();
+
+    /**
+     * @return the average latency of operations since the previous invocation
+     *         of this method
+     */
+    public long getAverageLatency();
+
+    /**
+     * @return the average number of keys processed per second since the
+     *         creation of this action
+     */
+    public long getCumulativeKeysPerSecond();
+
+    /**
+     * @return the total number of keys processed since the creation of this
+     *         action
+     */
+    public long getCumulativeKeys();
+
+    /**
+     * @return the total number of columns processed since the creation of this
+     *         action
+     */
+    public long getCumulativeColumns();
+
+    /**
+     * @return the average latency of operations since the creation of this
+     *         action
+     */
+    public long getCumulativeAverageLatency();
+
+    /**
+     * @return the total number of errors since the creation of this action
+     */
+    public long getCumulativeErrors();
+
+    /**
+     * @return the total number of operation failures since the creation of this
+     *         action
+     */
+    public long getCumulativeOpFailures();
+
+    /**
+     * @return the total number of keys verified since the creation of this
+     *         action
+     */
+    public long getCumulativeKeysVerified();
+
+  }
+}