You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2012/01/18 04:06:55 UTC
svn commit: r1232728 [1/2] - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce: ./
loadtest/
Author: nspiegelberg
Date: Wed Jan 18 03:06:54 2012
New Revision: 1232728
URL: http://svn.apache.org/viewvc?rev=1232728&view=rev
Log:
[HBASE-4916] New MR HBase load tester with pluggable workloads
Summary:
This is a new load tester for HBase which supports pluggable workloads
and more flexibility in configuring how the load is exercised. This load
tester runs as a native map reduce job. Interfaces which define the
workloads can be extended for specific testing. The intensity of a load
can be controlled by the number of clients, number of threads and
desired operations per second.
Test Plan: Has been run continuously on a dev cluster and on TITANMIGRATE001.
Reviewers: kranganathan, mbautin, liyintang
Reviewed By: liyintang
CC: hbase-eng@lists, erling, mbautin, cgist, liyintang, nspiegelberg
Differential Revision: 372837
Task ID: 741952
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/VersionWorkloadGenerator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Workload.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java?rev=1232728&r1=1232727&r2=1232728&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java Wed Jan 18 03:06:54 2012
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hadoop.hbase.mapreduce.loadtest.LoadTest;
/**
* Driver for hbase mapreduce jobs. Select which to run by passing
@@ -41,6 +42,7 @@ public class Driver {
"Complete a bulk data load.");
pgd.addClass(CopyTable.NAME, CopyTable.class,
"Export a table from local cluster to peer cluster");
+ pgd.addClass(LoadTest.NAME, LoadTest.class, "Load tester");
pgd.driver(args);
}
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/CompositeOperationGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,96 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * An operation generator which is a composite of other operation generators.
+ * Child generators will be assigned weights and will contribute operations with
+ * frequency proportional to their weight.
+ */
+public class CompositeOperationGenerator implements OperationGenerator {
+
+ // Maintain a list of generators where each child is referenced the same
+ // multiple times according to its weight. It is expected that this list will
+ // rarely be updated, making a copy-on-write implementation efficient.
+ private final CopyOnWriteArrayList<OperationGenerator> generators;
+
+ // The sequence number is incremented for each operation generated, and is
+ // used to determine from which child to generate the next operation.
+ private final AtomicLong nextSequence;
+
+ /**
+ * Create a new instance with no child generators.
+ */
+ public CompositeOperationGenerator() {
+ nextSequence = new AtomicLong(0);
+ generators = new CopyOnWriteArrayList<OperationGenerator>();
+ }
+
+ /**
+ * Add a child generator with a certain weight. The added child generator will
+ * be used to generate operations with frequency proportional to its weight
+ * relative to the total weight of all child generators of this instance. The
+ * value of weights are not required to sum to any particular value. Child
+ * weights should be reduced by the lowest common divisor if possible.
+ *
+ * @param generator the child generator to be added
+ * @param weight the relative weight of the child generator
+ */
+ public void addGenerator(OperationGenerator generator, int weight) {
+ if (weight <= 0) {
+ throw new IllegalArgumentException("generator weights must be positive");
+ }
+ List<OperationGenerator> newGenerators =
+ new ArrayList<OperationGenerator>();
+ for (int i = 0; i < weight; i++) {
+ newGenerators.add(generator);
+ }
+ generators.addAll(newGenerators);
+ }
+
+ /**
+ * Remove a child generator from this instance. All child instances equal to
+ * the passed generator will be removed.
+ *
+ * @param generator
+ */
+ public void removeGenerator(OperationGenerator generator) {
+ while (generators.remove(generator));
+ }
+
+ /**
+ * Get the next operation in the combined sequence of operations of this
+ * instances child generators. The returned operation may be null if there are
+ * no children or if the next operation could not be generated. If a child
+ * generator becomes exhausted as a result of calling nextOperation() on it,
+ * then it will be removed from this instance's children.
+ *
+ * @return the next operation to be executed, or null if the next operation
+ * could not be found
+ * @throws ExhaustedException if the last of this instance's child generators
+ * has itself become exhausted
+ */
+ public Operation nextOperation(DataGenerator dataGenerator)
+ throws ExhaustedException {
+ if (generators.size() == 0) {
+ throw new ExhaustedException();
+ }
+
+ OperationGenerator next = generators.get(
+ (int)(Math.abs(nextSequence.getAndIncrement() % generators.size())));
+
+ try {
+ return next.nextOperation(dataGenerator);
+ } catch (ExhaustedException e) {
+ removeGenerator(next);
+ return null;
+ } catch (IndexOutOfBoundsException e) {
+ return null;
+ } catch (ArithmeticException e) {
+ return null;
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/DataGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,126 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
+
+/**
+ * The DataGenerator class and its subclasses define the relationship between
+ * rows and columns and columns and values. These classes are used to construct
+ * the contents to write to a table as well as for verifying the integrity of
+ * the same data when it is read back from a table.
+ */
+public abstract class DataGenerator {
+
+ /**
+ * Get the column qualifiers for a specified row. A row with 0 columns is
+ * allowed.
+ *
+ * @param row
+ * @return the column qualifiers
+ */
+ public abstract byte[][] getColumnQualifiers(byte[] row);
+
+ /**
+ * Get the contents of a single column in a row.
+ *
+ * @param row
+ * @param column
+ * @return the contents of the column
+ */
+ public abstract byte[] getContent(byte[] row, byte[] column);
+
+ /**
+ * Get the contents of all columns in a row.
+ *
+ * @param row
+ * @param columns
+ * @return the contents of all columns
+ */
+ public abstract byte[][] getContents(byte[] row, byte[][] columns);
+
+ /**
+ * Verify that the specified result is consistent, that it has the expected
+ * column names and values for the corresponding key.
+ *
+ * @param result the result from a get or scan operation to be verified
+ * @return true if the result is consistent
+ */
+ public abstract boolean verify(Result result);
+
+ /**
+ * Construct a get operation which will get all of the columns of a specified
+ * key from a specified column family.
+ *
+ * @param key the key within the load-tester key space
+ * @param columnFamily
+ * @return a get operation for the specified key and column family
+ */
+ public abstract Get constructGet(long key, byte[] columnFamily);
+
+ /**
+ * Construct a put for the specified key into the specified column family. The
+ * returned put will contain all of the key values to be inserted in a single
+ * operation. This may return null if the row should have 0 columns.
+ *
+ * @param key the key within the load-tester key space
+ * @param columnFamily
+ * @return a single put containing all of the key values
+ */
+ public abstract Put constructBulkPut(long key, byte[] columnFamily);
+
+ /**
+ * Construct puts for the specified key into the specified column family. Each
+ * key value will be in a distinct put. This may return null if the row should
+ * have 0 columns.
+ *
+ * @param key the key within the load-tester key space
+ * @param columnFamily
+ * @return a list of puts, with each put having a single key value
+ */
+ public abstract List<Put> constructPuts(long key, byte[] columnFamily);
+
+ /**
+ * Construct a row key from a long key. The key will be prefixed by the MD5
+ * hash of the long key to randomize the order of keys.
+ *
+ * @param key the long key which is to be prefixed
+ * @return the prefixed key
+ */
+ public static String md5PrefixedKey(long key) {
+ String stringKey = Long.toString(key);
+ String md5hash = MD5Hash.getMD5AsHex(Bytes.toBytes(stringKey));
+
+ // flip the key to randomize
+ return md5hash + ":" + stringKey;
+ }
+
+ /**
+ * Create a new instance of the specified DataGenerator subclass, given the
+ * specified arguments. The structure of the arguments depends on the
+ * implementation of the DataGenerator subclass.
+ *
+ * @param className the full class name of the DataGenerator to instantiate
+ * @param args the arguments required by the DataGenerator implementation
+ * @return a new instance of the specified DataGenerator class
+ * @throws RuntimeException if for any reason the DataGenerator could not be
+ * instantiated
+ */
+ public static DataGenerator newInstance(String className, String args) {
+ try {
+ @SuppressWarnings("unchecked")
+ Class<DataGenerator> theClass =
+ (Class<DataGenerator>)Class.forName(className);
+ Constructor<DataGenerator> constructor =
+ theClass.getDeclaredConstructor(String.class);
+ return (DataGenerator)constructor.newInstance(args);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,117 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.client.Get;
+
+/**
+ * Generates get operations for keys randomly chosen from those which have been
+ * marked as successfully completed. Typically this will be used with a
+ * KeyCounter that is shared by a generator of insert operations, to read keys
+ * which are known to have been inserted.
+ */
+public class GetGenerator implements OperationGenerator {
+
+ private KeyCounter keysWritten;
+ private byte[] columnFamily;
+ private Random random;
+ private double verifyFraction;
+ private int maxVersions;
+ private long minTime;
+ private long maxTime;
+ private long timeDelta;
+
+ /**
+ * Construct a generator for get operations which return a single version from
+ * any time range.
+ *
+ * @param columnFamily
+ * @param keysWritten the source of keys
+ * @param verifyFraction the fraction in [0,1] of operations to verify
+ */
+ public GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+ double verifyFraction) {
+ this(columnFamily, keysWritten, verifyFraction, 1, 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * Construct a generator for get operations which should return at most a
+ * specified number of versions, each of which must be at most a specified
+ * age.
+ *
+ * @param columnFamily
+ * @param keysWritten the source of keys
+ * @param verifyFraction the fraction in [0,1] of operations to verify
+ * @param maxVersions the maximum number of versions to return
+ * @param timeDelta the maximum allowed age of a version, in milliseconds
+ */
+ public GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+ double verifyFraction, int maxVersions, long timeDelta) {
+ this(columnFamily, keysWritten, verifyFraction, maxVersions, 0, 0,
+ timeDelta);
+ }
+
+ /**
+ * Construct a generator for get operations which should return at most a
+ * specified number of versions, each of which must fall within a specified
+ * range of absolute timestamps.
+ *
+ * @param columnFamily
+ * @param keysWritten the source of keys
+ * @param verifyFraction the fraction in [0,1] of operations to verify
+ * @param maxVersions the maximum number of versions to return
+ * @param minTime the earliest allowable timestamp on a version
+ * @param maxTime the latest allowable timestamp on a version
+ */
+ public GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+ double verifyFraction, int maxVersions, long minTime, long maxTime) {
+ this(columnFamily, keysWritten, verifyFraction, maxVersions, minTime,
+ maxTime, 0);
+ }
+
+ /**
+ * Private constructor, used by public constructors to set all properties.
+ */
+ private GetGenerator(byte[] columnFamily, KeyCounter keysWritten,
+ double verifyFraction, int maxVersions, long minTime, long maxTime,
+ long timeDelta) {
+ this.keysWritten = keysWritten;
+ this.columnFamily = columnFamily;
+ this.random = new Random();
+ this.verifyFraction = verifyFraction;
+ this.maxVersions = maxVersions;
+ this.minTime = minTime;
+ this.maxTime = maxTime;
+ this.timeDelta = timeDelta;
+ }
+
+ /**
+ * Get the next operation. This may return null if no successful key could be
+ * found.
+ *
+ * @return the next get operation for a successful key, or null if none found
+ */
+ public Operation nextOperation(DataGenerator dataGenerator) {
+ try {
+ long key = keysWritten.getRandomKey();
+
+ Get get = dataGenerator.constructGet(key, columnFamily);
+ try {
+ get.setMaxVersions(maxVersions);
+ if (timeDelta > 0) {
+ long currentTime = System.currentTimeMillis();
+ get.setTimeRange(currentTime - timeDelta, currentTime);
+ } else {
+ get.setTimeRange(minTime, maxTime);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ boolean verify = random.nextDouble() < verifyFraction;
+ return new GetOperation(key, get, verify ? dataGenerator : null);
+ } catch (KeyCounter.NoKeysException e) {
+ return null;
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/GetOperation.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,59 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * An operation which reads a row from a table. The result of the operation is
+ * held internally and might be verified.
+ */
+public class GetOperation extends Operation{
+
+ private final long key;
+ private final Get get;
+ private Result result;
+ private DataGenerator dataGenerator;
+
+ public Operation.Type getType() {
+ return Operation.Type.GET;
+ }
+
+ /**
+ * Construct a new get operation.
+ *
+ * @param key the key to get
+ * @param get the populated get object
+ * @param dataGenerator the DataGenerator to use to verify the result, or null
+ * if the result should not be verified
+ */
+ public GetOperation(long key, Get get, DataGenerator dataGenerator) {
+ this.key = key;
+ this.get = get;
+ this.result = null;
+ this.dataGenerator = dataGenerator;
+ }
+
+ public void perform(HTable table) throws IOException {
+ result = table.get(get);
+ }
+
+ public void postAction() {
+ if (dataGenerator != null) {
+ if (!dataGenerator.verify(result)) {
+ StatsCollector.getInstance().getStats(getType()).incrementErrors(1);
+ System.err.println("Verification failed for key " + key);
+ }
+ }
+ }
+
+ public long getNumKeys() {
+ return 1;
+ }
+
+ public long getNumColumns() {
+ return result == null ? 0 : result.size();
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/KeyCounter.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,145 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.Serializable;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A class for tracking successful or failed keys. For keys in the space of
+ * longs, and are traversed in a linear sequence of all consecutive keys,
+ * instances of this class track which keys have been successfully processed or
+ * not and can return a random successful key.
+ *
+ * It is assumed that the number of keys which succeed is orders of magnitude
+ * greater than the number of keys which fail. The memory required by instances
+ * of this class is constant in the number of successful keys (assuming a bound
+ * on the duration for which a key can be in process) and linear in the number
+ * of failed keys.
+ */
+public class KeyCounter implements Serializable {
+
+ private static final long serialVersionUID = 8011474084218253286L;
+
+ // The number of recent successful keys which should be buffered for returning
+ // random successful keys if the standard approach fails.
+ private final static int NUM_RECENT_KEYS = 10000;
+
+ // The first key that is expected to be counted. All internal variables in the
+ // key space are stored after subtracting startKey from them, so that the
+ // internal key space always starts from 0 and is less likely to overflow.
+ private final long startKey;
+
+ // Store all keys that have failed.
+ private final ConcurrentHashMap<Long,Long> failedKeySet;
+
+ // Store some number of recently successful row keys so that there is a bound
+ // on the time it takes to get a random successful key. The map key is in the
+ // range [0, NUM_RECENT_KEYS) and the value is a row key that was recently
+ // successful.
+ private final ConcurrentHashMap<Long,Long> recentSuccessfulKeys;
+
+ // Used for inserting and overwriting recently successful keys in order of
+ // recentness.
+ private final AtomicLong numSuccessfulKeys;
+
+ // The greatest completed key contiguous to the range of completed keys
+ // starting with the startKey.
+ private final AtomicLong lastContiguousKey;
+
+ // A queue of keys which have been completed but are not contiguous to the
+ // completed range of keys starting with the startKey.
+ private final PriorityQueue<Long> keysCompleted;
+
+ // Used to randomly choose a completed successful key.
+ private final Random random;
+
+ /**
+ * Thrown if there are no completed successful keys.
+ */
+ public class NoKeysException extends Exception {
+ private static final long serialVersionUID = -7069323512531319455L;
+ }
+
+ /**
+ * Create a new KeyCounter which starts counting keys from startKey.
+ *
+ * @param startKey the first key expected to be counted
+ */
+ public KeyCounter(long startKey) {
+ this.startKey = startKey;
+ numSuccessfulKeys = new AtomicLong(0);
+ lastContiguousKey = new AtomicLong(-1);
+ keysCompleted = new PriorityQueue<Long>();
+ failedKeySet = new ConcurrentHashMap<Long,Long>();
+ recentSuccessfulKeys = new ConcurrentHashMap<Long, Long>(
+ (int)(1.1 * NUM_RECENT_KEYS), 0.75f, 100);
+ random = new Random();
+ }
+
+ /**
+ * Get a random key which was marked as successful.
+ *
+ * @return a random, successful key
+ * @throws NoKeysException if there have been no successful keys
+ */
+ public long getRandomKey() throws NoKeysException {
+ long last = lastContiguousKey.get();
+ if (last == -1) {
+ throw new NoKeysException();
+ }
+ long key = random.nextInt() << 32 | random.nextInt();
+ if (last == 0) {
+ key = 0;
+ } else {
+ key = Math.abs(key % last);
+ }
+ if (failedKeySet.containsKey(key)) {
+ if (recentSuccessfulKeys.size() > 0) {
+ key = recentSuccessfulKeys.get(
+ Math.abs(key % recentSuccessfulKeys.size()));
+ } else {
+ throw new NoKeysException();
+ }
+ }
+ // Translate to the external key space.
+ return key + startKey;
+ }
+
+ /**
+ * Mark a specified key as processed and either successful or failed.
+ *
+ * @param key the key which has been processed
+ * @param success true if the processing succeeded
+ */
+ public void markKey(long key, boolean success) {
+ // Translate to the internal key space.
+ key -= startKey;
+
+ if (success) {
+ recentSuccessfulKeys.put(
+ numSuccessfulKeys.getAndIncrement() % NUM_RECENT_KEYS, key);
+ } else {
+ failedKeySet.put(key, key);
+ }
+
+ synchronized (keysCompleted) {
+ if (key > lastContiguousKey.get() + 1) {
+ keysCompleted.offer(key);
+ } else if (key == lastContiguousKey.get() + 1) {
+ long last = key;
+ Long next;
+ while ((next = keysCompleted.peek()) != null &&
+ next.longValue() == last + 1) {
+ keysCompleted.poll();
+ last++;
+ }
+ lastContiguousKey.set(last);
+ } else {
+ // This key should already be represented in this counter.
+ return;
+ }
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/LoadTest.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,299 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class LoadTest extends Configured implements Tool {
+
+ public static final String NAME = "loadtest";
+
+ private static final int DEFAULT_JMX_PORT = 8991;
+
+ private static final String WORKLOADGENERATOR =
+ "hbase.loadtester.workloadgenerator";
+
+ private static final String WORKLOADGENERATOR_ARGS =
+ "hbase.loadtester.generator.args";
+
+ private static final String TABLENAME = "hbase.loadtester.tablename";
+
+ private static final String DEFAULT_TABLENAME = "test1";
+
+ private static final String DATAGENERATOR =
+ "hbase.loadtester.datagenerator";
+
+ private static final String DATAGENERATOR_ARGS =
+ "hbase.loadtester.datagenerator.args";
+
+ private static final String DEFAULT_DATAGENERATOR =
+ RandomDataGenerator.class.getName();
+
+ public static class Map
+ extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+ // The mapper should only be executed once. Ensure that each mapper only
+ // sends workloads once by flagging the first pass of each mapper. The
+ // driver will be responsible for ensuring that there is only one mapper.
+ private boolean firstPass;
+
+ public Map() {
+ this.firstPass = true;
+ }
+
+ public void map(LongWritable key, Text value,
+ Mapper<LongWritable, Text, LongWritable, Text>.Context context)
+ throws IOException, InterruptedException {
+ if (firstPass) {
+ // Determine from the job configuration which workloads to generate.
+ String className = context.getConfiguration().get(WORKLOADGENERATOR);
+ Workload.Generator workloadGenerator =
+ Workload.Generator.newInstance(className);
+
+ // Determine from the job configuration the name of the HBase table to
+ // use, and create that table according to the requirements of the
+ // workload generator.
+ String tableName = context.getConfiguration().get(TABLENAME,
+ DEFAULT_TABLENAME);
+ workloadGenerator.setupTable(configurationFromZooKeeper(
+ context.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM)),
+ tableName);
+
+ // Determine from the job configuration the arguments for the workload
+ // generator.
+ String generatorArgs =
+ context.getConfiguration().get(WORKLOADGENERATOR_ARGS);
+
+ // Generate the workloads for each of the reducer tasks.
+ List<List<Workload>> workloads =
+ workloadGenerator.generateWorkloads(context.getNumReduceTasks(),
+ generatorArgs);
+
+ // Serialize the workloads assigned to each reducer, and write them out.
+ // Multiple workloads for the same client may share common objects, so
+ // they must be serialized together into a single blob.
+ for (int i = 0; i < workloads.size(); i++) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ out.writeObject(workloads.get(i));
+ out.close();
+
+ // The key is used to partition the workloads to different reducers.
+ context.write(new LongWritable(i), new Text(baos.toByteArray()));
+ }
+ firstPass = false;
+ }
+ }
+ }
+
+ public static class Reduce
+ extends Reducer<LongWritable, Text, LongWritable, Text> {
+
+ public void reduce(LongWritable key, Iterable<Text> values,
+ Reducer<LongWritable, Text, LongWritable, Text>.Context context)
+ throws IOException, InterruptedException {
+ HBaseConfiguration conf = configurationFromZooKeeper(
+ context.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
+
+ Operation.setConfAndTableName(conf, context.getConfiguration().get(
+ TABLENAME, DEFAULT_TABLENAME).getBytes());
+
+ DataGenerator dataGenerator = DataGenerator.newInstance(
+ context.getConfiguration().get(DATAGENERATOR, DEFAULT_DATAGENERATOR),
+ context.getConfiguration().get(DATAGENERATOR_ARGS));
+
+ StatsCollector statsCollector = StatsCollector.getInstance();
+
+ List<Workload.Executor> executors = new ArrayList<Workload.Executor>();
+
+ for (Text value : values) {
+ try {
+ // Deserialize the workloads from the input.
+ ObjectInputStream in = new ObjectInputStream(
+ new ByteArrayInputStream(value.getBytes()));
+
+ @SuppressWarnings("unchecked")
+ List<Workload> workloads = (List<Workload>)in.readObject();
+ for (Workload workload : workloads) {
+ // Run each workload in its own executor, with dedicated thread
+ // pools per workload.
+ Workload.Executor executor =
+ new Workload.Executor(workload, dataGenerator);
+ executors.add(executor);
+ executor.start();
+
+ // Initialize the stats for each of the types of operations used
+ // by this workload.
+ statsCollector.initializeTypes(workload.getOperationTypes());
+ }
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ // Start the reporters for the initialized types of operations.
+ statsCollector.startMBeanReporters();
+ statsCollector.startConsoleReporters();
+ statsCollector.startMapReduceCounterReporter(context);
+
+ // Wait for the executors to finish. They should never actually finish,
+ // but if the inputs are exhausted then the reducer will terminate and
+ // the job will finish prematurely.
+ for (Workload.Executor executor : executors) {
+ executor.waitForFinish();
+ }
+ }
+ }
+
+ public static class Partition extends Partitioner<LongWritable, Text> {
+ public int getPartition(LongWritable key, Text value, int numPartitions) {
+ // Keys should be uniformly distributed across reducers.
+ return (int)(key.get() % numPartitions);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ public static HBaseConfiguration configurationFromZooKeeper(String zkQuorum) {
+ Configuration conf = new Configuration();
+ conf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+ // This constructor is deprecated, but there seems to be no other way to get
+ // a HBaseConfiguration instance, and casting from Configuration fails at
+ // runtime.
+ return new HBaseConfiguration(conf);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Configuration conf = this.getConf();
+
+ String[] otherArgs =
+ new GenericOptionsParser(conf, args).getRemainingArgs();
+ Options options = new Options();
+ options.addOption("zk", true, "ZooKeeper quorum");
+ options.addOption("tasks", true, "Number of client tasks");
+ options.addOption("table", true, "HBase table name");
+ options.addOption("wl", true, "WorkloadGenerator class name");
+ options.addOption("wlargs", true, "WorkloadGenerator arguments");
+ options.addOption("data", true, "DataGenerator class name");
+ options.addOption("dataargs", true, "DataGenerator arguments");
+ options.addOption("jmx", true, "Port number for remote JMX");
+
+ CommandLine cmd = new GnuParser().parse(options, otherArgs);
+ if (cmd.hasOption("zk")) {
+ String quorum = "";
+ for (String node : cmd.getOptionValues("zk")) {
+ quorum += node + ",";
+ }
+ conf.set(HConstants.ZOOKEEPER_QUORUM,
+ quorum.substring(0, quorum.length() - 1));
+ } else {
+ System.err.println("ZooKeeper quorum must be specified");
+ System.exit(1);
+ }
+
+ int numTasks = 1;
+ if (cmd.hasOption("tasks")) {
+ numTasks = Integer.parseInt(cmd.getOptionValue("tasks"));
+ if (numTasks == 0) {
+ System.err.println("The number of client tasks must be positive");
+ System.exit(1);
+ }
+ } else {
+ System.err.println("The number of client tasks must be specified");
+ System.exit(1);
+ }
+
+ if (cmd.hasOption("table")) {
+ conf.set(TABLENAME, cmd.getOptionValue("table"));
+ }
+
+ if (cmd.hasOption("wl")) {
+ conf.set(WORKLOADGENERATOR, cmd.getOptionValue("wl"));
+ } else {
+ System.err.println("No workloads specified");
+ System.exit(1);
+ }
+
+ if (cmd.hasOption("wlargs")) {
+ conf.set(WORKLOADGENERATOR_ARGS, cmd.getOptionValue("wlargs"));
+ }
+
+ if (cmd.hasOption("data")) {
+ conf.set(DATAGENERATOR, cmd.getOptionValue("data"));
+ }
+
+ if (cmd.hasOption("dataargs")) {
+ conf.set(DATAGENERATOR_ARGS, cmd.getOptionValue("dataargs"));
+ }
+
+ int jmxPort = DEFAULT_JMX_PORT;
+ if (cmd.hasOption("jmx")) {
+ jmxPort = Integer.parseInt(cmd.getOptionValue("jmx"));
+ }
+ conf.set("mapred.child.java.opts",
+ "-Dcom.sun.management.jmxremote.authenticate=false" +
+ " -Dcom.sun.management.jmxremote.ssl=false" +
+ " -Dcom.sun.management.jmxremote.port=" + jmxPort);
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+ String currentDate = dateFormat.format(new Date());
+
+ // Create a a single input file with a single key-value pair. The contents
+ // of the input file are irrelevant as the mapper does not care. The
+ // important constraint is that there is only one input file. The input and
+ // output paths are date-specific to avoid conflict, though no output is
+ // expected.
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath = new Path("/tmp/input/" + currentDate);
+ Path outputPath = new Path("/tmp/output/" + currentDate);
+ OutputStream out = fs.create(inputPath);
+ out.write("0\t0".getBytes());
+ out.close();
+
+ Job job = new Job(conf, "LoadTester_" + currentDate + "_" +
+ Class.forName(conf.get(WORKLOADGENERATOR)).getSimpleName());
+ job.setJarByClass(LoadTest.class);
+ job.setMapperClass(Map.class);
+ job.setReducerClass(Reduce.class);
+ job.setPartitionerClass(Partition.class);
+ job.setOutputKeyClass(LongWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(numTasks);
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int result = ToolRunner.run(new LoadTest(), args);
+ System.exit(result);
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/MixedWorkloadGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,100 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+/**
+ * A generator which mixes insert and get operations in a single threadpool,
+ * such that the ratio of operation types is strictly enforced.
+ */
+public class MixedWorkloadGenerator extends Workload.Generator {
+
+ private static final byte[] columnFamily = "actions".getBytes();
+
+ private int opsPerSecond = Integer.MAX_VALUE;
+ private int numThreads = 20;
+ private int insertWeight = 1;
+ private int getWeight = 1;
+ private double getVerificationFraction = 0.05;
+
+ public List<List<Workload>> generateWorkloads(int numWorkloads, String args) {
+ if (args != null) {
+ String[] splits = args.split(":");
+ if (splits.length != 5) {
+ throw new IllegalArgumentException("Wrong number of argument splits");
+ }
+ opsPerSecond = Integer.parseInt(splits[0]);
+ numThreads = Integer.parseInt(splits[1]);
+ insertWeight = Integer.parseInt(splits[2]);
+ getWeight = Integer.parseInt(splits[3]);
+ getVerificationFraction = Double.parseDouble(splits[4]);
+ }
+
+ List<List<Workload>> workloads = new ArrayList<List<Workload>>(numWorkloads);
+ for (int i = 0; i < numWorkloads; i++) {
+ List<Workload> clientWorkloads = new ArrayList<Workload>();
+ long startKey = Long.MAX_VALUE / numWorkloads * i;
+ clientWorkloads.add(new MixedWorkload(startKey, opsPerSecond, numThreads,
+ insertWeight, getWeight, getVerificationFraction));
+ workloads.add(clientWorkloads);
+ }
+ return workloads;
+ }
+
+ public HTableDescriptor getTableDescriptor() {
+ HTableDescriptor desc = new HTableDescriptor();
+ desc.addFamily(new HColumnDescriptor(columnFamily));
+ return desc;
+ }
+
+ public static class MixedWorkload implements Workload {
+
+ private static final long serialVersionUID = 576338016524909210L;
+ private long startKey;
+ private int opsPerSecond;
+ private int numThreads;
+ private int insertWeight;
+ private int getWeight;
+ private double getVerificationFraction;
+
+ public MixedWorkload(long startKey, int opsPerSecond, int numThreads,
+ int insertWeight, int getWeight, double getVerificationFraction) {
+ this.startKey = startKey;
+ this.opsPerSecond = opsPerSecond;
+ this.numThreads = numThreads;
+ this.insertWeight = insertWeight;
+ this.getWeight = getWeight;
+ this.getVerificationFraction = getVerificationFraction;
+ }
+
+ public OperationGenerator constructGenerator() {
+ KeyCounter keysWritten = new KeyCounter(startKey);
+ PutGenerator insertGenerator =
+ new PutGenerator(columnFamily, keysWritten, startKey, true);
+ GetGenerator getGenerator =
+ new GetGenerator(columnFamily, keysWritten, getVerificationFraction);
+
+ CompositeOperationGenerator compositeGenerator =
+ new CompositeOperationGenerator();
+ compositeGenerator.addGenerator(insertGenerator, insertWeight);
+ compositeGenerator.addGenerator(getGenerator, getWeight);
+ return compositeGenerator;
+ }
+
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ public int getOpsPerSecond() {
+ return opsPerSecond;
+ }
+
+ public EnumSet<Operation.Type> getOperationTypes() {
+ return EnumSet.of(Operation.Type.BULK_PUT, Operation.Type.GET);
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Operation.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,129 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.loadtest.StatsCollector.Stats;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+/**
+ * Represents an action that can be performed against an HBase HTable. The HBase
+ * configuration and table are defined statically. The Operation class maintains
+ * thread-local references to HTables, allowing multiple concurrent operations
+ * to be performed against the same table. Subclasses of Operation are not
+ * required to be thread-safe, and should not be accessed from multiple threads
+ * concurrently.
+ */
+public abstract class Operation implements Runnable {
+
+ private static volatile HBaseConfiguration conf;
+ private static volatile byte[] tableName;
+
+ /**
+ * HTable instances are not thread-safe, so they need to be encapsulated in a
+ * ThreadLocal variable.
+ */
+ private static final ThreadLocal<HTable> table = new ThreadLocal<HTable>() {
+ @Override
+ protected HTable initialValue() {
+ try {
+ return new HTable(conf, tableName);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+ };
+
+ /**
+ * Initialize the HBase configuration and table name against which all
+ * operations will be performed. This must be done before any operation is
+ * executed.
+ *
+ * @param conf the HBase configuration to use
+ * @param tableName the name of the HBase table against which to operate
+ */
+ public static void setConfAndTableName(
+ HBaseConfiguration conf, byte[] tableName) {
+ Operation.conf = conf;
+ Operation.tableName = tableName;
+ }
+
+ /**
+ * The type of operation, used for reporting and statistics.
+ */
+ public enum Type {
+ GET("get", "R"),
+ BULK_PUT("bulk-put", "W"),
+ PUT("put", "W");
+
+ public final String name;
+ public final String shortName;
+
+ private Type(String name, String shortName) {
+ this.name = name;
+ this.shortName = shortName;
+ }
+ };
+
+ /**
+ * Get the type of the operation represented by the subclass.
+ * @return the type of the operation
+ */
+ public abstract Type getType();
+
+ /**
+ * Perform this operation against a specific HBase table.
+ *
+ * @param table HBase table against which to perform the operation
+ * @throws IOException
+ */
+ public abstract void perform(HTable table) throws IOException;
+
+ /**
+ * Do any cleanup or verification required after the operation is performed.
+ * This will be called after the operation is performed, regardless of whether
+ * or not the operation was successful or threw an exception.
+ */
+ public abstract void postAction();
+
+ /**
+ * Get the number of keys affected by this operation. For some operations,
+ * which may be part of a group of operations acting on a single key, this
+ * method may return 0 for all but the last part to complete.
+ *
+ * @return the number of keys affected by this operation
+ */
+ public abstract long getNumKeys();
+
+ /**
+ * Get the number of columns affected by this operation.
+ *
+ * @return the number of columns affected by this operation
+ */
+ public abstract long getNumColumns();
+
+ /**
+ * Performs this operation against the table.
+ */
+ public void run() {
+ StatsCollector.Stats stats =
+ StatsCollector.getInstance().getStats(this.getType());
+ long start = System.currentTimeMillis();
+
+ try {
+ perform(table.get());
+ } catch (IOException e) {
+ stats.incrementFailures(1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ stats.incrementFailures(1);
+ } finally {
+ long elapsed = System.currentTimeMillis() - start;
+ stats.incrementCumulativeOpTime(elapsed);
+ stats.incrementKeys(getNumKeys());
+ stats.incrementColumns(getNumColumns());
+ postAction();
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/OperationGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,36 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+/**
+ * OperationGenerators provide a sequence of operations according to some rules
+ * or patterns defined by the implementation.
+ */
+public interface OperationGenerator {
+
+ /**
+ * Get the next operation to be executed. This method may occasionally return
+ * null if there was a transient issue in generating the operation. Returning
+ * null is not an error and should simply be ignored. This allows other
+ * OperationGenerator instances an opportunity to return operations instead of
+ * waiting for any particular instance to return its next value operation.
+ *
+ * Implementations may be intended to only provide a certain number of
+ * operations. Those implementations will thereafter throw ExhaustedExceptions
+ * when nextOperation() is invoked on them. Any instance which has thrown an
+ * ExhaustedException should no longer be asked for more operations, unless
+ * implementation-specific measures are used to reset that instance.
+ *
+ * @param dataGenerator DataGenerator used to construct the next operation
+ * @return the next operation, or null
+ * @throws ExhaustedException if the
+ */
+ public Operation nextOperation(DataGenerator dataGenerator)
+ throws ExhaustedException;
+
+ /**
+ * Thrown when an instance has run out of operations to return.
+ */
+ public static class ExhaustedException extends Exception {
+ private static final long serialVersionUID = -5647515931122719787L;
+ }
+
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,104 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Generates put operations, either bulk inserts or single key-value inserts.
+ * Operations are generated for consecutive increasing keys within the load
+ * tester key space, starting from a specified key.
+ */
+public class PutGenerator implements OperationGenerator {
+
+ protected final AtomicLong nextKey;
+ protected final byte[] columnFamily;
+ protected final KeyCounter keysWritten;
+
+ // If queue is null, then bulk inserts will be generated, otherwise, inserts
+ // of single KeyValues will be generated.
+ private final LinkedBlockingQueue<PutOperation> queue;
+
+ /**
+ * Default constructor.
+ *
+ * @param columnFamily
+ * @param keysWritten the sink to be populated with written keys
+ * @param startKey the first key to insert
+ * @param bulkInsert if true, operations will each insert multiple key-values
+ */
+ public PutGenerator(byte[] columnFamily, KeyCounter keysWritten,
+ long startKey, boolean bulkInsert) {
+ nextKey = new AtomicLong(startKey);
+ this.columnFamily = columnFamily;
+ this.keysWritten = keysWritten;
+ if (bulkInsert) {
+ this.queue = null;
+ } else {
+ this.queue = new LinkedBlockingQueue<PutOperation>();
+ }
+ }
+
+ public Operation nextOperation(DataGenerator dataGenerator) {
+ // Loop until we get a put operation or an error returns null.
+ while (true) {
+ // Check if there are queued operations which were parts of previous puts.
+ if (queue != null) {
+ PutOperation head = queue.poll();
+ if (head != null) {
+ return head;
+ }
+ }
+
+ try {
+ long key = getNextKey();
+ PutOperation put;
+ if (queue != null) {
+ put = getPut(dataGenerator, key);
+ } else {
+ put = getBulkPut(dataGenerator, key);
+ }
+ if (put != null) {
+ return put;
+ }
+ // The next key was not supposed to be written, so try again.
+ } catch (KeyCounter.NoKeysException e) {
+ // There were no keys to be written, do not try again.
+ return null;
+ }
+ }
+ }
+
+ protected long getNextKey() throws KeyCounter.NoKeysException {
+ return nextKey.getAndIncrement();
+ }
+
+ protected PutOperation getBulkPut(DataGenerator dataGenerator, long key) {
+ Put put = dataGenerator.constructBulkPut(key, columnFamily);
+ if (put != null) {
+ return new PutOperation(key, put, keysWritten);
+ } else {
+ // Key was defined to be skipped, mark it as complete so it can be read.
+ keysWritten.markKey(key, true);
+ return null;
+ }
+ }
+
+ protected PutOperation getPut(DataGenerator dataGenerator, long key) {
+ List<Put> puts = dataGenerator.constructPuts(key, columnFamily);
+ if (puts != null) {
+ AtomicInteger remainingParts = new AtomicInteger(puts.size());
+ for (int i = 1; i < puts.size(); i++) {
+ queue.offer(new PutOperation(key, puts.get(i), keysWritten,
+ remainingParts));
+ }
+ return new PutOperation(key, puts.get(0), keysWritten, remainingParts);
+ } else {
+ keysWritten.markKey(key, true);
+ return null;
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutOperation.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,90 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * An operation that inserts one or more key-values.
+ */
+public class PutOperation extends Operation {
+
+ private final Put put;
+ private final long key;
+ private boolean success;
+ private final KeyCounter writtenKeys;
+
+ // Multi-part operations need to coordinate between themselves so that only
+ // one of the parts makes certain updates. Related parts share an atomic
+ // counter, allowing them to determine which part was last to complete.
+ private final AtomicInteger partsRemaining;
+ private boolean wasLast;
+
+ /**
+ * Create a single-part (bulk insert) put operation.
+ *
+ * @param key the load-tester key space key being inserted
+ * @param put
+ * @param writtenKeys the sink of keys to be updated after execution
+ */
+ public PutOperation(long key, Put put, KeyCounter writtenKeys) {
+ this(key, put, writtenKeys, null);
+ }
+
+ /**
+ * Create a multi-part (non-bulk insert) put operation.
+ *
+ * @param key
+ * @param put
+ * @param partsRemaining shared counter of parts remaining to execute
+ * @param writtenKeys the sink of keys to be updated after execution
+ */
+ public PutOperation(long key, Put put, KeyCounter writtenKeys,
+ AtomicInteger partsRemaining) {
+ this.put = put;
+ this.key = key;
+ this.writtenKeys = writtenKeys;
+ this.success = false;
+
+ this.partsRemaining = partsRemaining;
+ this.wasLast = false;
+ }
+
+ public Operation.Type getType() {
+ return (partsRemaining == null)
+ ? Operation.Type.BULK_PUT
+ : Operation.Type.PUT;
+ }
+
+ public void perform(HTable table) throws IOException {
+ table.put(put);
+ success = true;
+ if (partsRemaining != null) {
+ // If this was not a bulk insert, determine if it was the last part.
+ wasLast = (partsRemaining.decrementAndGet() == 0);
+ }
+ }
+
+ /**
+ * Mark the key as completed so it can be used in get requests.
+ */
+ public void postAction() {
+ if (partsRemaining == null || wasLast) {
+ // Only mark the key complete if it was a bulk insert or the last part of
+ // several related inserts for the same key.
+ writtenKeys.markKey(key, success);
+ }
+ }
+
+ public long getNumKeys() {
+ // Return 1 only if this was the only (bulk) insert or if it was the last of
+ // several related inserts for the same key.
+ return (partsRemaining == null || wasLast) ? 1 : 0;
+ }
+
+ public long getNumColumns() {
+ return put.size();
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/PutReGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,30 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+/**
+ * Generates bulk puts for random keys which have already been successfully
+ * processed by something else. This is typically used for overwriting keys
+ * which were previously inserted by another generator, though this is not a
+ * requirement.
+ */
+public class PutReGenerator extends PutGenerator {
+
+ /**
+ * Default constructor.
+ *
+ * @param columnFamily
+ * @param keysWritten the source of keys to write and sink of keys written
+ * @param bulkInsert if true, operations will each insert multiple key-values
+ */
+ public PutReGenerator(byte[] columnFamily, KeyCounter keysWritten,
+ boolean bulkInsert) {
+ super(columnFamily, keysWritten, 0, bulkInsert);
+ }
+
+ /**
+ * Override superclass behavior to get a random key instead of the next
+ * sequential key.
+ */
+ protected long getNextKey() throws KeyCounter.NoKeysException {
+ return keysWritten.getRandomKey();
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/RandomDataGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,172 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A DataGenerator that pseudo-randomly chooses the number, qualifiers and
+ * contents of columns. PRNGs are seeded in a repeatable way per row key.
+ */
+public class RandomDataGenerator extends DataGenerator {
+
+ private static final int MIN_COLUMNS = 1;
+ private static final int MAX_COLUMNS = 65535;
+
+ private static final double DEFAULT_NUM_COLUMNS_MEAN = 10;
+ private static final double DEFAULT_QUALIFIER_SIZE_MEAN = 16;
+ private static final double DEFAULT_COLUMN_SIZE_MEAN = 512;
+
+ private final double numColumnsMean;
+ private final double qualifierSizeMean;
+ private final double columnSizeMean;
+
+ /**
+ * Constructs a new RandomDataGenerator, parsing properties from the passed
+ * arguments if any, using defaults otherwise.
+ *
+ * @param args
+ */
+ public RandomDataGenerator(String args) {
+ if (args != null) {
+ String[] splits = args.split(":");
+ if (splits.length != 3) {
+ throw new IllegalArgumentException("wrong number of parameters");
+ }
+ numColumnsMean = Double.parseDouble(splits[0]);
+ qualifierSizeMean = Double.parseDouble(splits[1]);
+ columnSizeMean = Double.parseDouble(splits[2]);
+ } else {
+ numColumnsMean = DEFAULT_NUM_COLUMNS_MEAN;
+ qualifierSizeMean = DEFAULT_QUALIFIER_SIZE_MEAN;
+ columnSizeMean = DEFAULT_COLUMN_SIZE_MEAN;
+ }
+ }
+
+ /**
+ * Choose a random number of columns between MIN_COLUMNS and MAX_COLUMNS (both
+ * inclusive) with a gaussian distribution around numColumnsMean. The length
+ * of each column qualifier is chosen randomly with a gaussian distribution.
+ * The first two bytes of each column qualifier are the index of that column
+ * within the row (in big endian order) and the rest of the qualifier is
+ * chosen randomly.
+ */
+ public byte[][] getColumnQualifiers(byte[] row) {
+ Random random = new Random(new String(row).hashCode());
+ int numCol = (int) Math.min(Math.max(MIN_COLUMNS,
+ Math.round(random.nextGaussian() + numColumnsMean)), MAX_COLUMNS);
+ byte[][] qualifiers = new byte[numCol][];
+
+ for (int i = 0; i < numCol; i++) {
+ int qualifierLength = (int) Math.max(2,
+ Math.round(random.nextGaussian() + qualifierSizeMean));
+ qualifiers[i] = new byte[qualifierLength];
+ random.nextBytes(qualifiers[i]);
+ qualifiers[i][0] = (byte)((i >>> 8) & 0xff);
+ qualifiers[i][1] = (byte)(i & 0xff);
+ }
+ return qualifiers;
+ }
+
+ /**
+ * Column content length and value are chosen randomly, seeded by the row key.
+ */
+ public byte[] getContent(byte[] row, byte[] column) {
+ Random random = new Random(new String(column).hashCode());
+ int contentLength =
+ (int) Math.max(1, random.nextGaussian() + columnSizeMean);
+ byte[] content = new byte[contentLength];
+ random.nextBytes(content);
+ return content;
+ }
+
+ /**
+ * Column content length and value are chosen randomly, seeded by the row key.
+ */
+ public byte[][] getContents(byte[] row, byte[][] columns) {
+ byte[][] contents = new byte[columns.length][];
+ Random random = new Random();
+ for (int i = 0; i < columns.length; i++) {
+ random.setSeed(new String(columns[i]).hashCode());
+ int contentLength =
+ (int)Math.max(1, random.nextGaussian() + columnSizeMean);
+ contents[i] = new byte[contentLength];
+ random.nextBytes(contents[i]);
+ }
+ return contents;
+ }
+
+ public boolean verify(Result result) {
+ if (result.isEmpty()) {
+ return false;
+ }
+ byte[] row = result.getRow();
+ byte[][] columns = getColumnQualifiers(row);
+ byte[][] contents = getContents(row, columns);
+ boolean[] verifiedColumns = new boolean[columns.length];
+
+ for (KeyValue kv : result.list()) {
+ byte[] qualifier = kv.getQualifier();
+ int index = qualifier[0] << 8 | qualifier[1];
+ if (!Arrays.equals(kv.getQualifier(), columns[index])) {
+ // The column qualifier did not match the expected qualifier.
+ return false;
+ } else {
+ if (!Arrays.equals(kv.getValue(), contents[index])) {
+ return false;
+ }
+ verifiedColumns[index] = true;
+ }
+ }
+ // Check that all columns were present in the result.
+ for (boolean verifiedColumn : verifiedColumns) {
+ if (!verifiedColumn) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Get constructGet(long key, byte[] columnFamily) {
+ Get get = new Get(DataGenerator.md5PrefixedKey(key).getBytes());
+ get.addFamily(columnFamily);
+ return get;
+ }
+
+ public Put constructBulkPut(long key, byte[] columnFamily) {
+ byte[] row = DataGenerator.md5PrefixedKey(key).getBytes();
+ byte[][] qualifiers = getColumnQualifiers(row);
+ if (qualifiers.length == 0) {
+ return null;
+ }
+ byte[][] contents = getContents(row, qualifiers);
+ Put put = new Put(row);
+ for (int i = 0; i < qualifiers.length; i++) {
+ put.add(columnFamily, qualifiers[i], contents[i]);
+ }
+ return put;
+ }
+
+ public List<Put> constructPuts(long key, byte[] columnFamily) {
+ byte[] row = DataGenerator.md5PrefixedKey(key).getBytes();
+ byte[][] qualifiers = getColumnQualifiers(row);
+ if (qualifiers.length == 0) {
+ return null;
+ }
+ byte[][] contents = getContents(row, qualifiers);
+ List<Put> puts = new ArrayList<Put>(contents.length);
+ for (int i = 0; i < qualifiers.length; i++) {
+ Put put = new Put(row);
+ put.add(columnFamily, qualifiers[i], contents[i]);
+ puts.add(put);
+ }
+ return puts;
+ }
+
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/SeparateWorkloadGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,125 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+/**
+ * A workload that has separate threadpools for insert operations and get
+ * operations on each client.
+ */
+public class SeparateWorkloadGenerator extends Workload.Generator {
+
+ private static final byte[] columnFamily = "actions".getBytes();
+
+ private int insertThreads = 20;
+ private int insertOpsPerSecond = insertThreads * 1000000;
+ private int getThreads = 20;
+ private int getOpsPerSecond = getThreads * 1000000;
+ private double getVerificationFraction = 0.05;
+
+ public List<List<Workload>> generateWorkloads(int numWorkloads, String args) {
+
+ if (args != null) {
+ String[] splits = args.split(":");
+ if (splits.length != 5) {
+ throw new IllegalArgumentException("Wrong number of argument splits");
+ }
+ insertOpsPerSecond = Integer.parseInt(splits[0]);
+ insertThreads = Integer.parseInt(splits[1]);
+ getOpsPerSecond = Integer.parseInt(splits[2]);
+ getThreads = Integer.parseInt(splits[3]);
+ getVerificationFraction = Double.parseDouble(splits[4]);
+ }
+
+ List<List<Workload>> workloads =
+ new ArrayList<List<Workload>>(numWorkloads);
+ for (int i = 0; i < numWorkloads; i++) {
+ List<Workload> clientWorkloads = new ArrayList<Workload>();
+ long startKey = Long.MAX_VALUE / numWorkloads * i;
+ KeyCounter keysWritten = new KeyCounter(startKey);
+ clientWorkloads.add(new GetWorkload(getOpsPerSecond, getThreads,
+ getVerificationFraction, keysWritten));
+ clientWorkloads.add(new InsertWorkload(startKey, insertOpsPerSecond,
+ insertThreads, keysWritten));
+ workloads.add(clientWorkloads);
+ }
+ return workloads;
+ }
+
+ public HTableDescriptor getTableDescriptor() {
+ HTableDescriptor desc = new HTableDescriptor();
+ desc.addFamily(new HColumnDescriptor(columnFamily));
+ return desc;
+ }
+
+ public static class GetWorkload implements Workload {
+
+ private static final long serialVersionUID = 4077118754127556529L;
+ private int opsPerSecond;
+ private int numThreads;
+ private double getVerificationFraction;
+ private KeyCounter keysWritten;
+
+ public GetWorkload(int opsPerSecond, int numThreads,
+ double getVerificationFraction, KeyCounter keysWritten) {
+ this.opsPerSecond = opsPerSecond;
+ this.numThreads = numThreads;
+ this.getVerificationFraction = getVerificationFraction;
+ this.keysWritten = keysWritten;
+ }
+
+ public OperationGenerator constructGenerator() {
+ return new GetGenerator(columnFamily, keysWritten,
+ getVerificationFraction);
+ }
+
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ public int getOpsPerSecond() {
+ return opsPerSecond;
+ }
+
+ public EnumSet<Operation.Type> getOperationTypes() {
+ return EnumSet.of(Operation.Type.GET);
+ }
+ }
+
+ public static class InsertWorkload implements Workload {
+
+ private static final long serialVersionUID = -6717959795026317422L;
+ private long startKey;
+ private int opsPerSecond;
+ private int numThreads;
+ private KeyCounter keysWritten;
+
+ public InsertWorkload(long startKey, int opsPerSecond, int numThreads,
+ KeyCounter keysWritten) {
+ this.startKey = startKey;
+ this.opsPerSecond = opsPerSecond;
+ this.numThreads = numThreads;
+ this.keysWritten = keysWritten;
+ }
+
+ public OperationGenerator constructGenerator() {
+ return new PutGenerator(columnFamily, keysWritten, startKey, true);
+ }
+
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ public int getOpsPerSecond() {
+ return opsPerSecond;
+ }
+
+ public EnumSet<Operation.Type> getOperationTypes() {
+ return EnumSet.of(Operation.Type.BULK_PUT);
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/StatsCollector.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,476 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.lang.management.ManagementFactory;
+import java.util.EnumSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Collects statistics on the performance of operations. StatsCollector is a
+ * singleton which manages all of the statistics for all operations within a
+ * client. The StatsCollector also provides reporters which make the collected
+ * statistics available through multiple outputs.
+ */
+public class StatsCollector {
+
+ private static final int CONSOLE_REPORTER_PERIOD_MS = 5000;
+ private static final int MRCOUNTER_REPORTER_PERIOD_MS = 5000;
+
+ private ConcurrentHashMap<Operation.Type, Stats> map;
+ private static final StatsCollector instance = new StatsCollector();
+
+ /**
+ * Get the singleton instance.
+ *
+ * @return the singleton instance
+ */
+ public static StatsCollector getInstance() {
+ return instance;
+ }
+
+ /**
+ * Private constructor to enforce singleton access.
+ */
+ private StatsCollector() {
+ this.map = new ConcurrentHashMap<Operation.Type, Stats>();
+ }
+
+ /**
+ * Get the Stats object for a certain type of operation.
+ *
+ * @param type
+ * @return the Stats object for the specified operation type
+ */
+ public Stats getStats(Operation.Type type) {
+ Stats stats = map.get(type);
+ if (stats == null) {
+ synchronized (map) {
+ // Double-check-locking is safe here because map is thread-safe.
+ if (map.get(type) == null) {
+ stats = new Stats(type);
+ map.put(type, stats);
+ }
+ }
+ }
+ return stats;
+ }
+
+ /**
+ * Ensure that Stats objects for the specified operation types are initialized
+ * and will be available to any reporters started after this call.
+ *
+ * @param types the set of operation types to initialize
+ */
+ public void initializeTypes(EnumSet<Operation.Type> types) {
+ for (Operation.Type type : types) {
+ getStats(type);
+ }
+ }
+
+ /**
+ * Start the MBean reporters for each of the initialized types of Stats. This
+ * should only be called once, and only after all Stats have been initialized.
+ */
+ public void startMBeanReporters() {
+ for (Stats stats : map.values()) {
+ new StatsMBeanReporter(stats);
+ }
+ }
+
+ /**
+ * Start a console reporter for each of the initialized types of Stats. This
+ * should only be called once, and only after all Stats have been initialized.
+ */
+ public void startConsoleReporters() {
+ for (Stats stats : map.values()) {
+ new StatsConsoleReporter(stats);
+ }
+ }
+
+ /**
+ * Start a mapreduce counter reporter for each of the initialized types of
+ * Stats. This should only be called once, and only after all Stats have been
+ * initialized.
+ *
+ * @param context the mapreduce context of which to update the counters
+ */
+ public void startMapReduceCounterReporter(
+ @SuppressWarnings("rawtypes") final TaskInputOutputContext context) {
+ for (Stats stats : map.values()) {
+ new StatsMapReduceCounterReporter(stats, context);
+ }
+ }
+
+ /**
+ * Represents the statistics collected about a particular type of operation.
+ */
+ public static class Stats {
+
+ private final Operation.Type type;
+
+ private final AtomicLong numKeys;
+ private final AtomicLong numKeysVerified;
+ private final AtomicLong numColumns;
+ private final AtomicLong numErrors;
+ private final AtomicLong numFailures;
+ private final AtomicLong cumulativeOpTime;
+
+ private final long startTime;
+
+ public Stats(Operation.Type type) {
+ this.type = type;
+
+ this.numKeys = new AtomicLong(0);
+ this.numKeysVerified = new AtomicLong(0);
+ this.numColumns = new AtomicLong(0);
+ this.numErrors = new AtomicLong(0);
+ this.numFailures = new AtomicLong(0);
+ this.cumulativeOpTime = new AtomicLong(0);
+
+ this.startTime = System.currentTimeMillis();
+ }
+
+ public Operation.Type getType() {
+ return this.type;
+ }
+
+ public long getNumKeys() {
+ return numKeys.get();
+ }
+
+ public void incrementKeys(long delta) {
+ numKeys.addAndGet(delta);
+ }
+
+ public long getNumKeysVerified() {
+ return numKeysVerified.get();
+ }
+
+ public void incrementKeysVerified(long delta) {
+ numKeysVerified.addAndGet(delta);
+ }
+
+ public long getNumColumns() {
+ return numColumns.get();
+ }
+
+ public void incrementColumns(long delta) {
+ numColumns.addAndGet(delta);
+ }
+
+ public long getNumErrors() {
+ return numErrors.get();
+ }
+
+ public void incrementErrors(long delta) {
+ numErrors.addAndGet(delta);
+ }
+
+ public long getNumFailures() {
+ return numFailures.get();
+ }
+
+ public void incrementFailures(long delta) {
+ numFailures.addAndGet(delta);
+ }
+
+ public long getCumulativeOpTime() {
+ return cumulativeOpTime.get();
+ }
+
+ public void incrementCumulativeOpTime(long delta) {
+ cumulativeOpTime.addAndGet(delta);
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+ }
+
+ /**
+ * A snapshot of an operation's stats at a particular time, to facilitate
+ * reporting of the rate of change of certain statistics.
+ */
+ public static class StatsSnapshot {
+
+ protected final Stats stats;
+
+ protected final AtomicLong priorKeysPerSecondCumulativeKeys;
+ protected final AtomicLong priorKeysPerSecondTime;
+ protected final AtomicLong priorColumnsPerSecondCumulativeColumns;
+ protected final AtomicLong priorColumnsPerSecondTime;
+ protected final AtomicLong priorLatencyCumulativeKeys;
+ protected final AtomicLong priorLatencyCumulativeLatency;
+
+ public StatsSnapshot(Stats stats) {
+ long now = System.currentTimeMillis();
+ this.stats = stats;
+ this.priorKeysPerSecondCumulativeKeys = new AtomicLong(0);
+ this.priorKeysPerSecondTime = new AtomicLong(now);
+ this.priorColumnsPerSecondCumulativeColumns = new AtomicLong(0);
+ this.priorColumnsPerSecondTime = new AtomicLong(now);
+ this.priorLatencyCumulativeKeys = new AtomicLong(0);
+ this.priorLatencyCumulativeLatency = new AtomicLong(0);
+ }
+
+ public synchronized long getKeysPerSecond() {
+ long currentTime = System.currentTimeMillis();
+ long priorTime = priorKeysPerSecondTime.getAndSet(currentTime);
+ long currentKeys = stats.getNumKeys();
+ long priorKeys = priorKeysPerSecondCumulativeKeys.getAndSet(currentKeys);
+ long timeDelta = currentTime - priorTime;
+ if (timeDelta == 0) {
+ return 0;
+ }
+ return 1000 * (currentKeys - priorKeys) / timeDelta;
+ }
+
+ public synchronized long getColumnsPerSecond() {
+ long currentTime = System.currentTimeMillis();
+ long priorTime = priorColumnsPerSecondTime.getAndSet(currentTime);
+ long currentColumns = stats.getNumColumns();
+ long priorColumns = priorColumnsPerSecondCumulativeColumns
+ .getAndSet(currentColumns);
+ long timeDelta = currentTime - priorTime;
+ if (timeDelta == 0) {
+ return 0;
+ }
+ return 1000 * (currentColumns - priorColumns) / timeDelta;
+ }
+
+ public synchronized long getAverageLatency() {
+ long currentLatency = stats.getCumulativeOpTime();
+ long priorLatency = priorLatencyCumulativeLatency
+ .getAndSet(currentLatency);
+ long currentKeys = stats.getNumKeys();
+ long priorKeys = priorLatencyCumulativeKeys.getAndSet(currentKeys);
+ long keyDelta = currentKeys - priorKeys;
+ if (keyDelta == 0) {
+ return 0;
+ }
+ return (currentLatency - priorLatency) / keyDelta;
+ }
+
+ public long getCumulativeKeysPerSecond() {
+ long timeDelta = System.currentTimeMillis() - stats.getStartTime();
+ if (timeDelta == 0) {
+ return 0;
+ }
+ return 1000 * stats.getNumKeys() / timeDelta;
+ }
+
+ public long getCumulativeKeys() {
+ return stats.getNumKeys();
+ }
+
+ public long getCumulativeColumns() {
+ return stats.getNumColumns();
+ }
+
+ public long getCumulativeAverageLatency() {
+ if (stats.getNumKeys() == 0) {
+ return 0;
+ }
+ return stats.getCumulativeOpTime() / stats.getNumKeys();
+ }
+
+ public long getCumulativeErrors() {
+ return stats.getNumErrors();
+ }
+
+ public long getCumulativeOpFailures() {
+ return stats.getNumFailures();
+ }
+
+ public long getCumulativeKeysVerified() {
+ return stats.getNumKeysVerified();
+ }
+ }
+
+ /**
+ * Periodically reports an operation's stats to the standard output console.
+ */
+ private static class StatsConsoleReporter extends StatsSnapshot {
+
+ public static String formatTime(long elapsedTime) {
+ String format = String.format("%%0%dd", 2);
+ elapsedTime = elapsedTime / 1000;
+ String seconds = String.format(format, elapsedTime % 60);
+ String minutes = String.format(format, (elapsedTime % 3600) / 60);
+ String hours = String.format(format, elapsedTime / 3600);
+ String time = hours + ":" + minutes + ":" + seconds;
+ return time;
+ }
+
+ public StatsConsoleReporter(final Stats stats) {
+ super(stats);
+ final long start = System.currentTimeMillis();
+
+ new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ System.out.println(formatTime(System.currentTimeMillis() - start) +
+ " [" + stats.getType().name + "] " +
+ "keys/s: [current: " + getKeysPerSecond() + " average: " +
+ getCumulativeKeysPerSecond() +"] " +
+ "latency: [current: " + getAverageLatency() + " average: " +
+ getCumulativeAverageLatency() + "] " +
+ "errors: " + getCumulativeErrors() +
+ " failures: " + getCumulativeOpFailures());
+ try {
+ Thread.sleep(CONSOLE_REPORTER_PERIOD_MS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }).start();
+ }
+ }
+
+ /**
+ * Periodically reports an operation's stats to mapreduce counters. Only stats
+ * which are additive between clients will be reported.
+ */
+ private static class StatsMapReduceCounterReporter extends StatsSnapshot {
+ public StatsMapReduceCounterReporter(final Stats stats,
+ @SuppressWarnings("rawtypes") final TaskInputOutputContext context) {
+ super(stats);
+
+ new Thread(new Runnable() {
+ public void run() {
+ String typeName = stats.getType().name;
+ long oldCumulativeKeysPerSecond = 0;
+ long oldCurrentKeysPerSecond = 0;
+ long oldCumulativeKeys = 0;
+ long oldCumulativeFailures = 0;
+ long oldCumulativeErrors = 0;
+
+ while (true) {
+ long newCumulativeKeysPerSecond = getCumulativeKeysPerSecond();
+ context.getCounter("cumulativeKeysPerSecond", typeName).increment(
+ newCumulativeKeysPerSecond - oldCumulativeKeysPerSecond);
+ oldCumulativeKeysPerSecond = newCumulativeKeysPerSecond;
+
+ long newCurrentKeysPerSecond = getKeysPerSecond();
+ context.getCounter("currentKeysPerSecond", typeName).increment(
+ newCurrentKeysPerSecond - oldCurrentKeysPerSecond);
+ oldCurrentKeysPerSecond = newCurrentKeysPerSecond;
+
+ long newCumulativeKeys = stats.getNumKeys();
+ context.getCounter("cumulativeKeys", typeName).increment(
+ newCumulativeKeys - oldCumulativeKeys);
+ oldCumulativeKeys = newCumulativeKeys;
+
+ long newCumulativeFailures = getCumulativeOpFailures();
+ context.getCounter("cumulativeFailures", typeName).increment(
+ newCumulativeFailures - oldCumulativeFailures);
+ oldCumulativeFailures = newCumulativeFailures;
+
+ long newCumulativeErrors = getCumulativeErrors();
+ context.getCounter("cumulativeErrors", typeName).increment(
+ newCumulativeErrors - oldCumulativeErrors);
+ oldCumulativeErrors = newCumulativeErrors;
+
+ context.progress();
+
+ try {
+ Thread.sleep(MRCOUNTER_REPORTER_PERIOD_MS);
+ } catch(InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }).start();
+ }
+ }
+
+ /**
+ * Make an operation's stats available for query via an MBean. Any rate-based
+ * stats will be calculated for the time between successive invocations of the
+ * MBean interface.
+ */
+ private static class StatsMBeanReporter extends StatsSnapshot
+ implements StatsMBeanReporterMBean {
+
+ public StatsMBeanReporter(Stats stats) {
+ super(stats);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try {
+ ObjectName name = new ObjectName("LoadTester:name=" +
+ stats.getType().shortName);
+ mbs.registerMBean(this, name);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static interface StatsMBeanReporterMBean {
+ /**
+ * @return the average number of keys processed per second since the
+ * previous invocation of this method
+ */
+ public long getKeysPerSecond();
+
+ /**
+ * @return the average number of columns processed per second since the
+ * previous invocation of this method
+ */
+ public long getColumnsPerSecond();
+
+ /**
+ * @return the average latency of operations since the previous invocation
+ * of this method
+ */
+ public long getAverageLatency();
+
+ /**
+ * @return the average number of keys processed per second since the
+ * creation of this action
+ */
+ public long getCumulativeKeysPerSecond();
+
+ /**
+ * @return the total number of keys processed since the creation of this
+ * action
+ */
+ public long getCumulativeKeys();
+
+ /**
+ * @return the total number of columns processed since the creation of this
+ * action
+ */
+ public long getCumulativeColumns();
+
+ /**
+ * @return the average latency of operations since the creation of this
+ * action
+ */
+ public long getCumulativeAverageLatency();
+
+ /**
+ * @return the total number of errors since the creation of this action
+ */
+ public long getCumulativeErrors();
+
+ /**
+ * @return the total number of operation failures since the creation of this
+ * action
+ */
+ public long getCumulativeOpFailures();
+
+ /**
+ * @return the total number of keys verified since the creation of this
+ * action
+ */
+ public long getCumulativeKeysVerified();
+
+ }
+}