You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2012/01/18 04:06:55 UTC
svn commit: r1232728 [2/2] - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce: ./
loadtest/
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/VersionWorkloadGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/VersionWorkloadGenerator.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/VersionWorkloadGenerator.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/VersionWorkloadGenerator.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,117 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+/**
+ * A workload that inserts key-values with new timestamps into existing rows of
+ * a table which keeps all versions, using a single threadpool per client for
+ * all operations.
+ */
+public class VersionWorkloadGenerator extends Workload.Generator {
+
+ private static final byte[] columnFamily = "actions".getBytes();
+
+ private int opsPerSecond = Integer.MAX_VALUE;
+ private int numThreads = 20;
+ private int insertWeight = 1;
+ private int reinsertWeight = 1;
+ private int getWeight = 1;
+ private double getVerificationFraction = 0.05;
+
+ public List<List<Workload>> generateWorkloads(int numWorkloads, String args) {
+
+ if (args != null) {
+ String[] splits = args.split(":");
+ if (splits.length != 6) {
+ throw new IllegalArgumentException("Wrong number of argument splits");
+ }
+ opsPerSecond = Integer.parseInt(splits[0]);
+ numThreads = Integer.parseInt(splits[1]);
+ insertWeight = Integer.parseInt(splits[2]);
+ reinsertWeight = Integer.parseInt(splits[3]);
+ getWeight = Integer.parseInt(splits[4]);
+ getVerificationFraction = Double.parseDouble(splits[5]);
+ }
+
+ List<List<Workload>> workloads =
+ new ArrayList<List<Workload>>(numWorkloads);
+ for (int i = 0; i < numWorkloads; i++) {
+ List<Workload> clientWorkloads = new ArrayList<Workload>();
+ long startKey = Long.MAX_VALUE / numWorkloads * i;
+ clientWorkloads.add(new MessagesWorkload(startKey, opsPerSecond,
+ numThreads, insertWeight, reinsertWeight, getWeight,
+ getVerificationFraction));
+ workloads.add(clientWorkloads);
+ }
+ return workloads;
+ }
+
+ public HTableDescriptor getTableDescriptor() {
+ HTableDescriptor desc = new HTableDescriptor();
+
+ // The column family should keep as many versions as possible.
+ HColumnDescriptor colDesc = new HColumnDescriptor(columnFamily);
+ colDesc.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(colDesc);
+ return desc;
+ }
+
+ public static class MessagesWorkload implements Workload {
+
+ private static final long serialVersionUID = 576338016524909210L;
+ private long startKey;
+ private int opsPerSecond;
+ private int numThreads;
+ private int insertWeight;
+ private int reinsertWeight;
+ private int getWeight;
+ private double getVerificationFraction;
+
+ public MessagesWorkload(long startKey, int opsPerSecond, int numThreads,
+ int insertWeight, int getWeight, int reinsertWeight,
+ double getVerificationFraction) {
+ this.startKey = startKey;
+ this.opsPerSecond = opsPerSecond;
+ this.numThreads = numThreads;
+ this.insertWeight = insertWeight;
+ this.reinsertWeight = reinsertWeight;
+ this.getWeight = getWeight;
+ this.getVerificationFraction = getVerificationFraction;
+ }
+
+ public OperationGenerator constructGenerator() {
+ KeyCounter keysWritten = new KeyCounter(startKey);
+ PutGenerator insertGenerator =
+ new PutGenerator(columnFamily, keysWritten, startKey, true);
+ PutReGenerator insertReGenerator =
+ new PutReGenerator(columnFamily, keysWritten, true);
+ GetGenerator getGenerator =
+ new GetGenerator(columnFamily, keysWritten, getVerificationFraction,
+ Integer.MAX_VALUE, 3600000);
+
+ CompositeOperationGenerator compositeGenerator =
+ new CompositeOperationGenerator();
+ compositeGenerator.addGenerator(insertGenerator, insertWeight);
+ compositeGenerator.addGenerator(insertReGenerator, reinsertWeight);
+ compositeGenerator.addGenerator(getGenerator, getWeight);
+ return compositeGenerator;
+ }
+
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ public int getOpsPerSecond() {
+ return opsPerSecond;
+ }
+
+ public EnumSet<Operation.Type> getOperationTypes() {
+ return EnumSet.of(Operation.Type.BULK_PUT, Operation.Type.GET);
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Workload.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Workload.java?rev=1232728&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Workload.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/loadtest/Workload.java Wed Jan 18 03:06:54 2012
@@ -0,0 +1,276 @@
+package org.apache.hadoop.hbase.mapreduce.loadtest;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+
+/**
+ * A workload encapsulates the sequence of operations to be executed (as
+ * represented by an OperationGenerator instance) as well as constraints on how
+ * those operations are to be executed, including the size of the thread pool
+ * and the target number of operations per second.
+ */
+public interface Workload extends Serializable {
+
+ /**
+ * Construct a generator which provides the operations of this workload. This
+ * method will return a new generator with each invocation, so its output
+ * should be cached and reused.
+ *
+ * @return a generator for this workload
+ */
+ public OperationGenerator constructGenerator();
+
+ /**
+ * Get the configured number of threads required to execute this workload.
+ *
+ * @return the number of threads required to execute this workload
+ */
+ public int getNumThreads();
+
+ /**
+ * Get the desired number of operations per second for this workload.
+ *
+ * @return the desired number of operations per second
+ */
+ public int getOpsPerSecond();
+
+ /**
+ * Get the types of operations used by this workload, so other resources can
+ * be properly initialized.
+ *
+ * @return set of operation types use by this workload
+ */
+ public EnumSet<Operation.Type> getOperationTypes();
+
+ /**
+ * Workload Generators are responsible for constructing the workloads that
+ * will be run by each of the client nodes. Since different workloads may have
+ * different requirements for how workloads are distributed across clients,
+ * that logic is encapsulated in workload generator implementations.
+ */
+ public static abstract class Generator {
+
+ /**
+ * Generate the workloads for a certain number of clients nodes. Each client
+ * may be assigned multiple workloads, to isolate different aspects of a
+ * workload, since each Workload instance will be executed with its own
+ * threadpool. Any parameters provided for generating the workloads will be
+ * passed as a single String, with implementation-specific formatting.
+ *
+ * @param numWorkloads the number of clients for which to generate workloads
+ * @param args the parameters for generating the workloads
+ * @return a list of lists of workloads, one for each client node
+ */
+ public abstract List<List<Workload>> generateWorkloads(
+ int numWorkloads, String args);
+
+ /**
+ * Helper method which deletes a particular table if it exists, then creates
+ * a new table with that name according to the implementation.
+ *
+ * @param conf configuration used to access the HBase cluster
+ * @param tableName the name of the table to setup
+ * @throws IOException
+ * @throws MasterNotRunningException
+ */
+ public void setupTable(HBaseConfiguration conf, String tableName)
+ throws IOException, MasterNotRunningException {
+ // Need to modify the configuration to have a higher timeout for RPCs
+ // because creating a presplit table with many regions might take a while.
+ @SuppressWarnings("deprecation")
+ HBaseConfiguration newConf = new HBaseConfiguration(conf);
+ newConf.setInt("hbase.rpc.timeout", 5 * 60000);
+
+ HBaseAdmin admin = new HBaseAdmin(newConf);
+ for (HTableDescriptor desc : admin.listTables()){
+ if (desc.getNameAsString().equals(tableName)) {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ break;
+ }
+ }
+ HTableDescriptor desc = getTableDescriptor();
+ desc.setName(tableName.getBytes());
+ byte[][] splitKeys = getSplitKeys(admin);
+ admin.createTable(desc, splitKeys);
+ }
+
+ /**
+ * Get a descriptor of the table used by this workload. Each workload must
+ * define the column families used by that workload, including any
+ * non-default parameters.
+ *
+ * @return a descriptor with the necessary column families defined
+ */
+ public abstract HTableDescriptor getTableDescriptor();
+
+ /**
+ * Helper method to instantiate a Workload Generator instance for a named
+ * implementation.
+ *
+ * @param className the name of the Generator implementation class
+ * @return a new instance of the named Generator implementation
+ */
+ public static Generator newInstance(String className) {
+ try {
+ return (Generator)Class.forName(className).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final int DEFAULT_REGIONS_PER_SERVER = 5;
+
+ private static byte[][] getSplitKeys(HBaseAdmin admin) throws IOException {
+ int numberOfServers = admin.getClusterStatus().getServers();
+ int totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER;
+ return new RegionSplitter.HexStringSplit().split(totalNumberOfRegions);
+ }
+ }
+
+ /**
+ * The Executor will execute a workload according to its configuration of
+ * threads and rate limit (operations per second), using a specified data
+ * generator. Once an executor is started, it will continue until the workload
+ * has been exhausted.
+ *
+ * The Executor uses a ScheduledThreadPoolExecutor to delay the execution of
+ * operations to a desired rate of operations per second. However, if there is
+ * a delay in the execution of operations and the desired rate is not
+ * maintained, then future operations will not be delayed until the overall
+ * average rate is equal to the desired rate. This may cause the rate of
+ * execution to sometimes exceed the desired rate.
+ */
+ public static class Executor {
+
+ private static final int EXECUTOR_QUEUEING_FACTOR = 10;
+ private static final double EXECUTOR_BUFFER_THREAD_FACTOR = 0.1;
+
+ private final OperationGenerator opGenerator;
+ private final DataGenerator dataGenerator;
+ private final BoundedExecutor executor;
+ private final Workload workload;
+ private ScheduledThreadPoolExecutor scheduler;
+
+ public Executor(Workload workload, DataGenerator dataGenerator) {
+ this.workload = workload;
+ this.dataGenerator = dataGenerator;
+
+ opGenerator = workload.constructGenerator();
+
+ java.util.concurrent.Executor threadPool =
+ new ThreadPoolExecutor(workload.getNumThreads(),
+ workload.getNumThreads(), Long.MAX_VALUE, TimeUnit.NANOSECONDS,
+ new ArrayBlockingQueue<Runnable>(
+ EXECUTOR_QUEUEING_FACTOR * workload.getNumThreads()));
+
+ executor = new BoundedExecutor(threadPool,
+ EXECUTOR_QUEUEING_FACTOR * workload.getNumThreads());
+ }
+
+ /**
+ * Start executing the workload.
+ */
+ public void start() {
+ if (workload.getOpsPerSecond() <= 0) {
+ throw new IllegalArgumentException("Workload must have positive " +
+ "number of operations per second");
+ }
+
+ int numThreads = (int) (Math.ceil(workload.getNumThreads() *
+ EXECUTOR_BUFFER_THREAD_FACTOR));
+
+ // Determine the delay in nanoseconds between operations for each thread.
+ long delayNS = numThreads * 1000000000L / workload.getOpsPerSecond();
+
+ scheduler = new ScheduledThreadPoolExecutor(numThreads);
+ final Runnable queueNextOperation = new Runnable() {
+ public void run() {
+ try {
+ while (true) {
+ Operation operation = opGenerator.nextOperation(dataGenerator);
+ if (operation != null) {
+ executor.submitTask(operation);
+ return;
+ }
+ }
+ } catch (InterruptedException e) {
+ } catch (OperationGenerator.ExhaustedException e) {
+ // By default, ScheduledThreadPoolExecutor cancels periodic tasks
+ // when shutdown() is called, so shutdownNow() should not be needed.
+ scheduler.shutdown();
+ }
+ }
+ };
+
+ // If a task runs longer than the period, subsequent tasks will start
+ // late. Since generating operations takes some amount of time, have
+ // multiple threads for generating and submitting operations.
+ for (int i = 0; i < numThreads; i++) {
+ scheduler.scheduleAtFixedRate(queueNextOperation, 0, delayNS,
+ TimeUnit.NANOSECONDS);
+ }
+ }
+
+ /**
+ * Wait for the executor to finish, which happens when its workload has
+ * been exhausted. This must be called after start().
+ */
+ public void waitForFinish() {
+ if (scheduler == null) {
+ throw new IllegalStateException("Executor has not started yet");
+ }
+ try {
+ scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * A wrapper for an executor which blocks threads attempting to submit tasks
+ * when a certain bound of tasks are already queued or executing.
+ */
+ private static class BoundedExecutor {
+ private final java.util.concurrent.Executor exec;
+ private final Semaphore semaphore;
+
+ public BoundedExecutor(java.util.concurrent.Executor exec, int bound) {
+ this.exec = exec;
+ this.semaphore = new Semaphore(bound);
+ }
+
+ public void submitTask(final Runnable command)
+ throws InterruptedException {
+ semaphore.acquire();
+ try {
+ exec.execute(new Runnable() {
+ public void run() {
+ try {
+ command.run();
+ } finally {
+ semaphore.release();
+ }
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ semaphore.release();
+ }
+ }
+ }
+ }
+}