You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2012/05/18 00:23:29 UTC

svn commit: r1339890 - in /hbase/branches/0.92: ./ src/main/java/org/apache/hadoop/hbase/util/ src/test/java/org/apache/hadoop/hbase/ src/test/java/org/apache/hadoop/hbase/util/

Author: apurtell
Date: Thu May 17 22:23:29 2012
New Revision: 1339890

URL: http://svn.apache.org/viewvc?rev=1339890&view=rev
Log:
HBASE-5124. Backport LoadTestTool to 0.92

Added:
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
Modified:
    hbase/branches/0.92/CHANGES.txt
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java

Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1339890&r1=1339889&r2=1339890&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Thu May 17 22:23:29 2012
@@ -82,6 +82,7 @@ Release 0.92.2 - Unreleased
    HBASE-5128  [uber hbck] Online automated repair of table integrity and region consistency problems
    HBASE-5599  [hbck] handle NO_VERSION_FILE and SHOULD_NOT_BE_DEPLOYED inconsistencies (fulin wang)
    HBASE-5719  Enhance hbck to sideline overlapped mega regions (Jimmy Xiang)
+   HBASE-5124  Backport LoadTestTool to 0.92 (Ted and Andrew)
 
 Release 0.92.1 - March 17th, 2012
   INCOMPATIBLE CHANGES

Added: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (added)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java Thu May 17 22:23:29 2012
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Common base class used for HBase command-line tools. Simplifies workflow and
+ * command-line argument parsing.
+ */
+public abstract class AbstractHBaseTool implements Tool {
+
+  private static final int EXIT_SUCCESS = 0;
+  private static final int EXIT_FAILURE = 1;
+
+  private static final String HELP_OPTION = "help";
+
+  private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class);
+
+  private final Options options = new Options();
+
+  protected Configuration conf = null;
+
+  private static final Set<String> requiredOptions = new TreeSet<String>();
+
+  /**
+   * Override this to add command-line options using {@link #addOptWithArg}
+   * and similar methods.
+   */
+  protected abstract void addOptions();
+
+  /**
+   * This method is called to process the options after they have been parsed.
+   */
+  protected abstract void processOptions(CommandLine cmd);
+
+  /** The "main function" of the tool */
+  protected abstract void doWork() throws Exception;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public final int run(String[] args) throws Exception {
+    if (conf == null) {
+      LOG.error("Tool configuration is not initialized");
+      throw new NullPointerException("conf");
+    }
+
+    CommandLine cmd;
+    try {
+      // parse the command line arguments
+      cmd = parseArgs(args);
+    } catch (ParseException e) {
+      LOG.error("Error when parsing command-line arguemnts", e);
+      printUsage();
+      return EXIT_FAILURE;
+    }
+
+    if (cmd.hasOption(HELP_OPTION) || !sanityCheckOptions(cmd)) {
+      printUsage();
+      return EXIT_FAILURE;
+    }
+
+    processOptions(cmd);
+
+    try {
+      doWork();
+    } catch (Exception e) {
+      LOG.error("Error running command-line tool", e);
+      return EXIT_FAILURE;
+    }
+    return EXIT_SUCCESS;
+  }
+
+  private boolean sanityCheckOptions(CommandLine cmd) {
+    boolean success = true;
+    for (String reqOpt : requiredOptions) {
+      if (!cmd.hasOption(reqOpt)) {
+        LOG.error("Required option -" + reqOpt + " is missing");
+        success = false;
+      }
+    }
+    return success;
+  }
+
+  private CommandLine parseArgs(String[] args) throws ParseException {
+    options.addOption(HELP_OPTION, false, "Show usage");
+    addOptions();
+    CommandLineParser parser = new BasicParser();
+    return parser.parse(options, args);
+  }
+
+  private void printUsage() {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(80);
+    String usageHeader = "Options:";
+    String usageFooter = "";
+    String usageStr = "bin/hbase " + getClass().getName() + " <options>";
+
+    helpFormatter.printHelp(usageStr, usageHeader, options,
+        usageFooter);
+  }
+
+  protected void addRequiredOptWithArg(String opt, String description) {
+    requiredOptions.add(opt);
+    addOptWithArg(opt, description);
+  }
+
+  protected void addOptNoArg(String opt, String description) {
+    options.addOption(opt, false, description);
+  }
+
+  protected void addOptWithArg(String opt, String description) {
+    options.addOption(opt, true, description);
+  }
+
+  /**
+   * Parse a number and enforce a range.
+   */
+  public static long parseLong(String s, long minValue, long maxValue) {
+    long l = Long.parseLong(s);
+    if (l < minValue || l > maxValue) {
+      throw new IllegalArgumentException("The value " + l
+          + " is out of range [" + minValue + ", " + maxValue + "]");
+    }
+    return l;
+  }
+
+  public static int parseInt(String s, int minValue, int maxValue) {
+    return (int) parseLong(s, minValue, maxValue);
+  }
+
+  /** Call this from the concrete tool class's main function. */
+  protected void doStaticMain(String args[]) {
+    int ret;
+    try {
+      ret = ToolRunner.run(HBaseConfiguration.create(), this, args);
+    } catch (Exception ex) {
+      LOG.error("Error running command-line tool", ex);
+      ret = EXIT_FAILURE;
+    }
+    System.exit(ret);
+  }
+
+}

Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1339890&r1=1339889&r2=1339890&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java Thu May 17 22:23:29 2012
@@ -151,4 +151,27 @@ public class Threads {
       }
     };
   }
+
+  /**
+   * Sleeps for the given amount of time even if interrupted. Preserves
+   * the interrupt status.
+   * @param msToWait the amount of time to sleep in milliseconds
+   */
+  public static void sleepWithoutInterrupt(final long msToWait) {
+    long timeMillis = System.currentTimeMillis();
+    long endTime = timeMillis + msToWait;
+    boolean interrupted = false;
+    while (timeMillis < endTime) {
+      try {
+        Thread.sleep(endTime - timeMillis);
+      } catch (InterruptedException ex) {
+        interrupted = true;
+      }
+      timeMillis = System.currentTimeMillis();
+    }
+
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
 }

Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1339890&r1=1339889&r2=1339890&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu May 17 22:23:29 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.RegionSplitter;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
@@ -88,6 +90,12 @@ public class HBaseTestingUtility {
   private Configuration conf;
   private MiniZooKeeperCluster zkCluster = null;
   /**
+   * The default number of regions per regionserver when creating a pre-split
+   * table.
+   */
+  private static int DEFAULT_REGIONS_PER_SERVER = 5;
+
+  /**
    * Set if we were passed a zkCluster.  If so, we won't shutdown zk as
    * part of general shutdown.
    */
@@ -1596,4 +1604,48 @@ public class HBaseTestingUtility {
     return zkw;
   }
   
+  /**
+   * Creates a pre-split table for load testing. If the table already exists,
+   * logs a warning and continues.
+   * @return the number of regions the table was split into
+   */
+  public static int createPreSplitLoadTestTable(Configuration conf,
+      byte[] tableName, byte[] columnFamily, Algorithm compression) throws IOException {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
+    hcd.setCompressionType(compression);
+    desc.addFamily(hcd);
+
+    int totalNumberOfRegions = 0;
+    try {
+      HBaseAdmin admin = new HBaseAdmin(conf);
+
+      // create a table a pre-splits regions.
+      // The number of splits is set as:
+      //    region servers * regions per region server).
+      int numberOfServers = admin.getClusterStatus().getServers().size();
+      if (numberOfServers == 0) {
+        throw new IllegalStateException("No live regionservers");
+      }
+
+      totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER;
+      LOG.info("Number of live regionservers: " + numberOfServers + ", " +
+          "pre-splitting table into " + totalNumberOfRegions + " regions " +
+          "(default regions per server: " + DEFAULT_REGIONS_PER_SERVER + ")");
+
+      byte[][] splits = new RegionSplitter.MD5StringSplit().split(
+          totalNumberOfRegions);
+
+      admin.createTable(desc, splits);
+      admin.close();
+    } catch (MasterNotRunningException e) {
+      LOG.error("Master not running", e);
+      throw new IOException(e);
+    } catch (TableExistsException e) {
+      LOG.warn("Table " + Bytes.toStringBinary(tableName) +
+          " already exists, continuing");
+    }
+    return totalNumberOfRegions;
+  }
+
 }

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java Thu May 17 22:23:29 2012
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Random;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
+
+/**
+ * A generator of random keys and values for load testing. Keys are generated
+ * by converting numeric indexes to strings and prefixing them with an MD5
+ * hash. Values are generated by selecting value size in the configured range
+ * and generating a pseudo-random sequence of bytes seeded by key, column
+ * qualifier, and value size.
+ * <p>
+ * Not thread-safe, so a separate instance is needed for every writer thread/
+ */
+public class LoadTestKVGenerator {
+
+  /** A random number generator for determining value size */
+  private Random randomForValueSize = new Random();
+
+  private final int minValueSize;
+  private final int maxValueSize;
+
+  public LoadTestKVGenerator(int minValueSize, int maxValueSize) {
+    if (minValueSize <= 0 || maxValueSize <= 0) {
+      throw new IllegalArgumentException("Invalid min/max value sizes: " +
+          minValueSize + ", " + maxValueSize);
+    }
+    this.minValueSize = minValueSize;
+    this.maxValueSize = maxValueSize;
+  }
+
+  /**
+   * Verifies that the given byte array is the same as what would be generated
+   * for the given row key and qualifier. We are assuming that the value size
+   * is correct, and only verify the actual bytes. However, if the min/max
+   * value sizes are set sufficiently high, an accidental match should be
+   * extremely improbable.
+   */
+  public static boolean verify(String rowKey, String qual, byte[] value) {
+    byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
+    return Bytes.equals(expectedData, value);
+  }
+
+  /**
+   * Converts the given key to string, and prefixes it with the MD5 hash of
+   * the index's string representation.
+   */
+  public static String md5PrefixedKey(long key) {
+    String stringKey = Long.toString(key);
+    String md5hash = MD5Hash.getMD5AsHex(Bytes.toBytes(stringKey));
+
+    // flip the key to randomize
+    return md5hash + "-" + stringKey;
+  }
+
+  /**
+   * Generates a value for the given key index and column qualifier. Size is
+   * selected randomly in the configured range. The generated value depends
+   * only on the combination of the key, qualifier, and the selected value
+   * size. This allows to verify the actual value bytes when reading, as done
+   * in {@link #verify(String, String, byte[])}.
+   */
+  public byte[] generateRandomSizeValue(long key, String qual) {
+    String rowKey = md5PrefixedKey(key);
+    int dataSize = minValueSize + randomForValueSize.nextInt(
+        Math.abs(maxValueSize - minValueSize));
+    return getValueForRowColumn(rowKey, qual, dataSize);
+  }
+
+  /**
+   * Generates random bytes of the given size for the given row and column
+   * qualifier. The random seed is fully determined by these parameters.
+   */
+  private static byte[] getValueForRowColumn(String rowKey, String qual,
+      int dataSize) {
+    Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
+        dataSize);
+    byte[] randomBytes = new byte[dataSize];
+    seededRandom.nextBytes(randomBytes);
+    return randomBytes;
+  }
+
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Thu May 17 22:23:29 2012
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+/**
+ * A command-line utility that reads, writes, and verifies data. Unlike
+ * {@link PerformanceEvaluation}, this tool validates the data written,
+ * and supports simultaneously writing and reading the same set of keys.
+ */
+public class LoadTestTool extends AbstractHBaseTool {
+
+  private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
+
+  /** Table name for the test */
+  private byte[] tableName;
+
+  /** Table name to use of not overridden on the command line */
+  private static final String DEFAULT_TABLE_NAME = "cluster_test";
+
+  /** Column family used by the test */
+  static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf");
+
+  /** Column families used by the test */
+  static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY };
+
+  /** The number of reader/writer threads if not specified */
+  private static final int DEFAULT_NUM_THREADS = 20;
+
+  /** Usage string for the load option */
+  private static final String OPT_USAGE_LOAD =
+      "<avg_cols_per_key>:<avg_data_size>" +
+      "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
+  /** Usa\ge string for the read option */
+  private static final String OPT_USAGE_READ =
+      "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
+  private static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
+      Arrays.toString(StoreFile.BloomType.values());
+
+  private static final String OPT_USAGE_COMPRESSION = "Compression type, " +
+      "one of " + Arrays.toString(Compression.Algorithm.values());
+
+  private static final String OPT_BLOOM = "bloom";
+  private static final String OPT_COMPRESSION = "compression";
+
+  private static final String OPT_KEY_WINDOW = "key_window";
+  private static final String OPT_WRITE = "write";
+  private static final String OPT_MAX_READ_ERRORS = "max_read_errors";
+  private static final String OPT_MULTIPUT = "multiput";
+  private static final String OPT_NUM_KEYS = "num_keys";
+  private static final String OPT_READ = "read";
+  private static final String OPT_START_KEY = "start_key";
+  private static final String OPT_TABLE_NAME = "tn";
+  private static final String OPT_ZK_QUORUM = "zk";
+
+  private static final long DEFAULT_START_KEY = 0;
+
+  /** This will be removed as we factor out the dependency on command line */
+  private CommandLine cmd;
+
+  private MultiThreadedWriter writerThreads = null;
+  private MultiThreadedReader readerThreads = null;
+
+  private long startKey, endKey;
+
+  private boolean isWrite, isRead;
+
+  // Column family options
+  private Compression.Algorithm compressAlgo;
+  private StoreFile.BloomType bloomType;
+
+  // Writer options
+  private int numWriterThreads = DEFAULT_NUM_THREADS;
+  private long minColsPerKey, maxColsPerKey;
+  private int minColDataSize, maxColDataSize;
+  private boolean isMultiPut;
+
+  // Reader options
+  private int numReaderThreads = DEFAULT_NUM_THREADS;
+  private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
+  private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
+  private int verifyPercent;
+
+  private String[] splitColonSeparated(String option,
+      int minNumCols, int maxNumCols) {
+    String optVal = cmd.getOptionValue(option);
+    String[] cols = optVal.split(":");
+    if (cols.length < minNumCols || cols.length > maxNumCols) {
+      throw new IllegalArgumentException("Expected at least "
+          + minNumCols + " columns but no more than " + maxNumCols +
+          " in the colon-separated value '" + optVal + "' of the " +
+          "-" + option + " option");
+    }
+    return cols;
+  }
+
+  private int getNumThreads(String numThreadsStr) {
+    return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
+  }
+
+  /**
+   * Apply column family options such as Bloom filters, compression, and data
+   * block encoding.
+   */
+  private void applyColumnFamilyOptions(byte[] tableName,
+      byte[][] columnFamilies) throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
+    LOG.info("Disabling table " + Bytes.toString(tableName));
+    admin.disableTable(tableName);
+    for (byte[] cf : columnFamilies) {
+      HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
+      if (bloomType != null) {
+        columnDesc.setBloomFilterType(bloomType);
+      }
+      if (compressAlgo != null) {
+        columnDesc.setCompressionType(compressAlgo);
+      }
+      admin.modifyColumn(tableName, columnDesc);
+    }
+    LOG.info("Enabling table " + Bytes.toString(tableName));
+    admin.enableTable(tableName);
+  }
+
+  @Override
+  protected void addOptions() {
+    addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
+        "without port numbers");
+    addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
+    addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
+    addOptWithArg(OPT_READ, OPT_USAGE_READ);
+    addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
+    addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
+    addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
+        "to tolerate before terminating all reader threads. The default is " +
+        MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
+    addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
+        "reads and writes for concurrent write/read workload. The default " +
+        "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
+
+    addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
+        "separate puts for every column in a row");
+
+    addRequiredOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
+    addOptWithArg(OPT_START_KEY, "The first key to read/write " +
+        "(a 0-based index). The default value is " +
+        DEFAULT_START_KEY + ".");
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    this.cmd = cmd;
+
+    tableName = Bytes.toBytes(cmd.getOptionValue(OPT_TABLE_NAME,
+        DEFAULT_TABLE_NAME));
+    startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
+        String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
+    long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
+        Long.MAX_VALUE - startKey);
+    endKey = startKey + numKeys;
+
+    isWrite = cmd.hasOption(OPT_WRITE);
+    isRead = cmd.hasOption(OPT_READ);
+
+    if (!isWrite && !isRead) {
+      throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
+          "-" + OPT_READ + " has to be specified");
+    }
+
+    parseColumnFamilyOptions(cmd);
+
+    if (isWrite) {
+      String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
+
+      int colIndex = 0;
+      minColsPerKey = 1;
+      maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
+      int avgColDataSize =
+          parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
+      minColDataSize = avgColDataSize / 2;
+      maxColDataSize = avgColDataSize * 3 / 2;
+
+      if (colIndex < writeOpts.length) {
+        numWriterThreads = getNumThreads(writeOpts[colIndex++]);
+      }
+
+      isMultiPut = cmd.hasOption(OPT_MULTIPUT);
+
+      System.out.println("Multi-puts: " + isMultiPut);
+      System.out.println("Columns per key: " + minColsPerKey + ".."
+          + maxColsPerKey);
+      System.out.println("Data size per column: " + minColDataSize + ".."
+          + maxColDataSize);
+    }
+
+    if (isRead) {
+      String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
+      int colIndex = 0;
+      verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
+      if (colIndex < readOpts.length) {
+        numReaderThreads = getNumThreads(readOpts[colIndex++]);
+      }
+
+      if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
+        maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
+            0, Integer.MAX_VALUE);
+      }
+
+      if (cmd.hasOption(OPT_KEY_WINDOW)) {
+        keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
+            0, Integer.MAX_VALUE);
+      }
+
+      System.out.println("Percent of keys to verify: " + verifyPercent);
+      System.out.println("Reader threads: " + numReaderThreads);
+    }
+
+    System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
+  }
+
+  private void parseColumnFamilyOptions(CommandLine cmd) {
+    String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
+    compressAlgo = compressStr == null ? null :
+        Compression.Algorithm.valueOf(compressStr);
+
+    String bloomStr = cmd.getOptionValue(OPT_BLOOM);
+    bloomType = bloomStr == null ? null :
+        StoreFile.BloomType.valueOf(bloomStr);
+  }
+
+  @Override
+  protected void doWork() throws IOException {
+    if (cmd.hasOption(OPT_ZK_QUORUM)) {
+      conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
+    }
+
+    HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
+        COLUMN_FAMILY, compressAlgo);
+    applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
+
+    if (isWrite) {
+      writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
+      writerThreads.setMultiPut(isMultiPut);
+      writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
+      writerThreads.setDataSize(minColDataSize, maxColDataSize);
+    }
+
+    if (isRead) {
+      readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
+          verifyPercent);
+      readerThreads.setMaxErrors(maxReadErrors);
+      readerThreads.setKeyWindow(keyWindow);
+    }
+
+    if (isRead && isWrite) {
+      LOG.info("Concurrent read/write workload: making readers aware of the " +
+          "write point");
+      readerThreads.linkToWriter(writerThreads);
+    }
+
+    if (isWrite) {
+      System.out.println("Starting to write data...");
+      writerThreads.start(startKey, endKey, numWriterThreads);
+    }
+
+    if (isRead) {
+      System.out.println("Starting to read data...");
+      readerThreads.start(startKey, endKey, numReaderThreads);
+    }
+
+    if (isWrite) {
+      writerThreads.waitForFinish();
+    }
+
+    if (isRead) {
+      readerThreads.waitForFinish();
+    }
+  }
+
+  public static void main(String[] args) {
+    new LoadTestTool().doStaticMain(args);
+  }
+
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Thu May 17 22:23:29 2012
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Common base class for reader and writer parts of multi-thread HBase load
+ * test ({@link LoadTestTool}).
+ */
+public abstract class MultiThreadedAction {
+  private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
+
+  protected final byte[] tableName;
+  protected final byte[] columnFamily;
+  protected final Configuration conf;
+
+  protected int numThreads = 1;
+
+  /** The start key of the key range, inclusive */
+  protected long startKey = 0;
+
+  /** The end key of the key range, exclusive */
+  protected long endKey = 1;
+
+  protected AtomicInteger numThreadsWorking = new AtomicInteger();
+  protected AtomicLong numKeys = new AtomicLong();
+  protected AtomicLong numCols = new AtomicLong();
+  protected AtomicLong totalOpTimeMs = new AtomicLong();
+  protected boolean verbose = false;
+
+  protected int minDataSize = 256;
+  protected int maxDataSize = 1024;
+
+  /** "R" or "W" */
+  private String actionLetter;
+
+  /** Whether we need to print out Hadoop Streaming-style counters */
+  private boolean streamingCounters;
+
+  public static final int REPORTING_INTERVAL_MS = 5000;
+
+  public MultiThreadedAction(Configuration conf, byte[] tableName,
+      byte[] columnFamily, String actionLetter) {
+    this.conf = conf;
+    this.tableName = tableName;
+    this.columnFamily = columnFamily;
+    this.actionLetter = actionLetter;
+  }
+
+  public void start(long startKey, long endKey, int numThreads)
+      throws IOException {
+    this.startKey = startKey;
+    this.endKey = endKey;
+    this.numThreads = numThreads;
+    (new Thread(new ProgressReporter(actionLetter))).start();
+  }
+
+  private static String formatTime(long elapsedTime) {
+    String format = String.format("%%0%dd", 2);
+    elapsedTime = elapsedTime / 1000;
+    String seconds = String.format(format, elapsedTime % 60);
+    String minutes = String.format(format, (elapsedTime % 3600) / 60);
+    String hours = String.format(format, elapsedTime / 3600);
+    String time =  hours + ":" + minutes + ":" + seconds;
+    return time;
+  }
+
+  /** Asynchronously reports progress */
+  private class ProgressReporter implements Runnable {
+
+    private String reporterId = "";
+
+    public ProgressReporter(String id) {
+      this.reporterId = id;
+    }
+
+    @Override
+    public void run() {
+      long startTime = System.currentTimeMillis();
+      long priorNumKeys = 0;
+      long priorCumulativeOpTime = 0;
+      int priorAverageKeysPerSecond = 0;
+
+      // Give other threads time to start.
+      Threads.sleep(REPORTING_INTERVAL_MS);
+
+      while (numThreadsWorking.get() != 0) {
+        String threadsLeft =
+            "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
+        if (numKeys.get() == 0) {
+          LOG.info(threadsLeft + "Number of keys = 0");
+        } else {
+          long numKeys = MultiThreadedAction.this.numKeys.get();
+          long time = System.currentTimeMillis() - startTime;
+          long totalOpTime = totalOpTimeMs.get();
+
+          long numKeysDelta = numKeys - priorNumKeys;
+          long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
+
+          double averageKeysPerSecond =
+              (time > 0) ? (numKeys * 1000 / time) : 0;
+
+          LOG.info(threadsLeft
+              + "Keys="
+              + numKeys
+              + ", cols="
+              + StringUtils.humanReadableInt(numCols.get())
+              + ", time="
+              + formatTime(time)
+              + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
+                  + numKeys * 1000 / time + ", latency=" + totalOpTime
+                  / numKeys + " ms]") : "")
+              + ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
+                  + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
+                  + totalOpTimeDelta / numKeysDelta + " ms]") : "")
+              + progressInfo());
+
+          if (streamingCounters) {
+            printStreamingCounters(numKeysDelta,
+                averageKeysPerSecond - priorAverageKeysPerSecond);
+          }
+
+          priorNumKeys = numKeys;
+          priorCumulativeOpTime = totalOpTime;
+          priorAverageKeysPerSecond = (int) averageKeysPerSecond;
+        }
+
+        Threads.sleep(REPORTING_INTERVAL_MS);
+      }
+    }
+
+    private void printStreamingCounters(long numKeysDelta,
+        double avgKeysPerSecondDelta) {
+      // Write stats in a format that can be interpreted as counters by
+      // streaming map-reduce jobs.
+      System.err.println("reporter:counter:numKeys," + reporterId + ","
+          + numKeysDelta);
+      System.err.println("reporter:counter:numCols," + reporterId + ","
+          + numCols.get());
+      System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
+          + "," + (long) (avgKeysPerSecondDelta));
+    }
+  }
+
+  public void setDataSize(int minDataSize, int maxDataSize) {
+    this.minDataSize = minDataSize;
+    this.maxDataSize = maxDataSize;
+  }
+
+  public void waitForFinish() {
+    while (numThreadsWorking.get() != 0) {
+      Threads.sleepWithoutInterrupt(1000);
+    }
+  }
+
+  protected void startThreads(Collection<? extends Thread> threads) {
+    numThreadsWorking.addAndGet(threads.size());
+    for (Thread thread : threads) {
+      thread.start();
+    }
+  }
+
+  /** @return the end key of the key range, exclusive */
+  public long getEndKey() {
+    return endKey;
+  }
+
+  /** Returns a task-specific progress string */
+  protected abstract String progressInfo();
+
+  protected static void appendToStatus(StringBuilder sb, String desc,
+      long v) {
+    if (v == 0) {
+      return;
+    }
+    sb.append(", ");
+    sb.append(desc);
+    sb.append("=");
+    sb.append(v);
+  }
+
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Thu May 17 22:23:29 2012
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+
+/** Creates multiple threads that read and verify previously written data */
+public class MultiThreadedReader extends MultiThreadedAction
+{
+  private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
+
+  private Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
+  private final double verifyPercent;
+  private volatile boolean aborted;
+
+  private MultiThreadedWriter writer = null;
+
+  /**
+   * The number of keys verified in a sequence. This will never be larger than
+   * the total number of keys in the range. The reader might also verify
+   * random keys when it catches up with the writer.
+   */
+  private final AtomicLong numUniqueKeysVerified = new AtomicLong();
+
+  /**
+   * Default maximum number of read errors to tolerate before shutting down all
+   * readers.
+   */
+  public static final int DEFAULT_MAX_ERRORS = 10;
+
+  /**
+   * Default "window" size between the last key written by the writer and the
+   * key that we attempt to read. The lower this number, the stricter our
+   * testing is. If this is zero, we always attempt to read the highest key
+   * in the contiguous sequence of keys written by the writers.
+   */
+  public static final int DEFAULT_KEY_WINDOW = 0;
+
+  protected AtomicLong numKeysVerified = new AtomicLong(0);
+  private AtomicLong numReadErrors = new AtomicLong(0);
+  private AtomicLong numReadFailures = new AtomicLong(0);
+
+  private int maxErrors = DEFAULT_MAX_ERRORS;
+  private int keyWindow = DEFAULT_KEY_WINDOW;
+
+  public MultiThreadedReader(Configuration conf, byte[] tableName,
+      byte[] columnFamily, double verifyPercent) {
+    super(conf, tableName, columnFamily, "R");
+    this.verifyPercent = verifyPercent;
+  }
+
+  public void linkToWriter(MultiThreadedWriter writer) {
+    this.writer = writer;
+    writer.setTrackInsertedKeys(true);
+  }
+
+  public void setMaxErrors(int maxErrors) {
+    this.maxErrors = maxErrors;
+  }
+
+  public void setKeyWindow(int keyWindow) {
+    this.keyWindow = keyWindow;
+  }
+
+  @Override
+  public void start(long startKey, long endKey, int numThreads)
+      throws IOException {
+    super.start(startKey, endKey, numThreads);
+    if (verbose) {
+      LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
+    }
+
+    for (int i = 0; i < numThreads; ++i) {
+      HBaseReaderThread reader = new HBaseReaderThread(i);
+      readers.add(reader);
+    }
+    startThreads(readers);
+  }
+
+  public class HBaseReaderThread extends Thread {
+    private final int readerId;
+    private final HTable table;
+    private final Random random = new Random();
+
+    /** The "current" key being read. Increases from startKey to endKey. */
+    private long curKey;
+
+    /** Time when the thread started */
+    private long startTimeMs;
+
+    /** If we are ahead of the writer and reading a random key. */
+    private boolean readingRandomKey;
+
+    /**
+     * @param readerId only the keys with this remainder from division by
+     *          {@link #numThreads} will be read by this thread
+     */
+    public HBaseReaderThread(int readerId) throws IOException {
+      this.readerId = readerId;
+      table = new HTable(conf, tableName);
+      setName(getClass().getSimpleName() + "_" + readerId);
+    }
+
+    @Override
+    public void run() {
+      try {
+        runReader();
+      } finally {
+        try {
+          table.close();
+        } catch (IOException e) {
+          LOG.error("Error closing table", e);
+        }
+        numThreadsWorking.decrementAndGet();
+      }
+    }
+
+    private void runReader() {
+      if (verbose) {
+        LOG.info("Started thread #" + readerId + " for reads...");
+      }
+
+      startTimeMs = System.currentTimeMillis();
+      curKey = startKey;
+      while (curKey < endKey && !aborted) {
+        long k = getNextKeyToRead();
+
+        // A sanity check for the key range.
+        if (k < startKey || k >= endKey) {
+          numReadErrors.incrementAndGet();
+          throw new AssertionError("Load tester logic error: proposed key " +
+              "to read " + k + " is out of range (startKey=" + startKey +
+              ", endKey=" + endKey + ")");
+        }
+
+        if (k % numThreads != readerId ||
+            writer != null && writer.failedToWriteKey(k)) {
+          // Skip keys that this thread should not read, as well as the keys
+          // that we know the writer failed to write.
+          continue;
+        }
+
+        readKey(k);
+        if (k == curKey - 1 && !readingRandomKey) {
+          // We have verified another unique key.
+          numUniqueKeysVerified.incrementAndGet();
+        }
+      }
+    }
+
+    /**
+     * Should only be used for the concurrent writer/reader workload. The
+     * maximum key we are allowed to read, subject to the "key window"
+     * constraint.
+     */
+    private long maxKeyWeCanRead() {
+      long insertedUpToKey = writer.insertedUpToKey();
+      if (insertedUpToKey >= endKey - 1) {
+        // The writer has finished writing our range, so we can read any
+        // key in the range.
+        return endKey - 1;
+      }
+      return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow);
+    }
+
+    private long getNextKeyToRead() {
+      readingRandomKey = false;
+      if (writer == null || curKey <= maxKeyWeCanRead()) {
+        return curKey++;
+      }
+
+      // We caught up with the writer. See if we can read any keys at all.
+      long maxKeyToRead;
+      while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
+        // The writer has not written sufficient keys for us to be able to read
+        // anything at all. Sleep a bit. This should only happen in the
+        // beginning of a load test run.
+        Threads.sleepWithoutInterrupt(50);
+      }
+
+      if (curKey <= maxKeyToRead) {
+        // The writer wrote some keys, and we are now allowed to read our
+        // current key.
+        return curKey++;
+      }
+
+      // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
+      // Don't increment the current key -- we still have to try reading it
+      // later. Set a flag to make sure that we don't count this key towards
+      // the set of unique keys we have verified.
+      readingRandomKey = true;
+      return startKey + Math.abs(random.nextLong())
+          % (maxKeyToRead - startKey + 1);
+    }
+
+    private Get readKey(long keyToRead) {
+      Get get = new Get(
+          LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
+      get.addFamily(columnFamily);
+
+      try {
+        if (verbose) {
+          LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
+              + ", cf " + Bytes.toStringBinary(columnFamily));
+        }
+        queryKey(get, random.nextInt(100) < verifyPercent);
+      } catch (IOException e) {
+        numReadFailures.addAndGet(1);
+        LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+            + ", time from start: "
+            + (System.currentTimeMillis() - startTimeMs) + " ms");
+      }
+      return get;
+    }
+
+    public void queryKey(Get get, boolean verify) throws IOException {
+      String rowKey = new String(get.getRow());
+
+      // read the data
+      long start = System.currentTimeMillis();
+      Result result = table.get(get);
+      totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+      numKeys.addAndGet(1);
+
+      // if we got no data report error
+      if (result.isEmpty()) {
+         HRegionLocation hloc = table.getRegionLocation(
+             Bytes.toBytes(rowKey));
+        LOG.info("Key = " + rowKey + ", RegionServer: "
+            + hloc.getHostname());
+        numReadErrors.addAndGet(1);
+        LOG.error("No data returned, tried to get actions for key = "
+            + rowKey + (writer == null ? "" : ", keys inserted by writer: " +
+                writer.numKeys.get() + ")"));
+
+         if (numReadErrors.get() > maxErrors) {
+          LOG.error("Aborting readers -- found more than " + maxErrors
+              + " errors\n");
+           aborted = true;
+         }
+      }
+
+      if (result.getFamilyMap(columnFamily) != null) {
+        // increment number of columns read
+        numCols.addAndGet(result.getFamilyMap(columnFamily).size());
+
+        if (verify) {
+          // verify the result
+          List<KeyValue> keyValues = result.list();
+          for (KeyValue kv : keyValues) {
+            String qual = new String(kv.getQualifier());
+
+            // if something does not look right report it
+            if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
+              numReadErrors.addAndGet(1);
+              LOG.error("Error checking data for key = " + rowKey
+                  + ", actionId = " + qual);
+            }
+          }
+          numKeysVerified.addAndGet(1);
+        }
+      }
+    }
+
+  }
+
+  public long getNumReadFailures() {
+    return numReadFailures.get();
+  }
+
+  public long getNumReadErrors() {
+    return numReadErrors.get();
+  }
+
+  public long getNumKeysVerified() {
+    return numKeysVerified.get();
+  }
+
+  public long getNumUniqueKeysVerified() {
+    return numUniqueKeysVerified.get();
+  }
+
+  @Override
+  protected String progressInfo() {
+    StringBuilder sb = new StringBuilder();
+    appendToStatus(sb, "verified", numKeysVerified.get());
+    appendToStatus(sb, "READ FAILURES", numReadFailures.get());
+    appendToStatus(sb, "READ ERRORS", numReadErrors.get());
+    return sb.toString();
+  }
+
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Thu May 17 22:23:29 2012
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+
+/** Creates multiple threads that write key/values into the */
+public class MultiThreadedWriter extends MultiThreadedAction {
+  private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
+
+  private long minColumnsPerKey = 1;
+  private long maxColumnsPerKey = 10;
+  private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
+
+  private boolean isMultiPut = false;
+
+  /**
+   * A temporary place to keep track of inserted keys. This is written to by
+   * all writers and is drained on a separate thread that populates
+   * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
+   * being inserted. This queue is supposed to stay small.
+   */
+  private BlockingQueue<Long> insertedKeys =
+      new ArrayBlockingQueue<Long>(10000);
+
+  /**
+   * This is the current key to be inserted by any thread. Each thread does an
+   * atomic get and increment operation and inserts the current value.
+   */
+  private AtomicLong nextKeyToInsert = new AtomicLong();
+
+  /**
+   * The highest key in the contiguous range of keys .
+   */
+  private AtomicLong insertedUpToKey = new AtomicLong();
+
+  /** The sorted set of keys NOT inserted by the writers */
+  private Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
+
+  /**
+   * The total size of the temporary inserted key set that have not yet lined
+   * up in a our contiguous sequence starting from startKey. Supposed to stay
+   * small.
+   */
+  private AtomicLong insertedKeyQueueSize = new AtomicLong();
+
+  /** Enable this if used in conjunction with a concurrent reader. */
+  private boolean trackInsertedKeys;
+
+  public MultiThreadedWriter(Configuration conf, byte[] tableName,
+      byte[] columnFamily) {
+    super(conf, tableName, columnFamily, "W");
+  }
+
+  /** Use multi-puts vs. separate puts for every column in a row */
+  public void setMultiPut(boolean isMultiPut) {
+    this.isMultiPut = isMultiPut;
+  }
+
+  public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
+    this.minColumnsPerKey = minColumnsPerKey;
+    this.maxColumnsPerKey = maxColumnsPerKey;
+  }
+
+  @Override
+  public void start(long startKey, long endKey, int numThreads)
+      throws IOException {
+    super.start(startKey, endKey, numThreads);
+
+    if (verbose) {
+      LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
+    }
+
+    nextKeyToInsert.set(startKey);
+    insertedUpToKey.set(startKey - 1);
+
+    for (int i = 0; i < numThreads; ++i) {
+      HBaseWriterThread writer = new HBaseWriterThread(i);
+      writers.add(writer);
+    }
+
+    if (trackInsertedKeys) {
+      new Thread(new InsertedKeysTracker()).start();
+      numThreadsWorking.incrementAndGet();
+    }
+
+    startThreads(writers);
+  }
+
+  public static byte[] longToByteArrayKey(long rowKey) {
+    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
+  }
+
+  private class HBaseWriterThread extends Thread {
+    private final HTable table;
+    private final int writerId;
+
+    private final Random random = new Random();
+    private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
+        minDataSize, maxDataSize);
+
+    public HBaseWriterThread(int writerId) throws IOException {
+      setName(getClass().getSimpleName() + "_" + writerId);
+      table = new HTable(conf, tableName);
+      this.writerId = writerId;
+    }
+
+    public void run() {
+      try {
+        long rowKey;
+        while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
+          long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
+              % (maxColumnsPerKey - minColumnsPerKey);
+          numKeys.addAndGet(1);
+          if (isMultiPut) {
+            multiPutInsertKey(rowKey, 0, numColumns);
+          } else {
+            for (long col = 0; col < numColumns; ++col) {
+              insert(rowKey, col);
+            }
+          }
+          if (trackInsertedKeys) {
+            insertedKeys.add(rowKey);
+          }
+        }
+      } finally {
+        try {
+          table.close();
+        } catch (IOException e) {
+          LOG.error("Error closing table", e);
+        }
+        numThreadsWorking.decrementAndGet();
+      }
+    }
+
+    public void insert(long rowKey, long col) {
+      Put put = new Put(longToByteArrayKey(rowKey));
+      String colAsStr = String.valueOf(col);
+      put.add(columnFamily, colAsStr.getBytes(),
+          dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
+      try {
+        long start = System.currentTimeMillis();
+        table.put(put);
+        numCols.addAndGet(1);
+        totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+      } catch (IOException e) {
+        failedKeySet.add(rowKey);
+        LOG.error("Failed to insert: " + rowKey);
+        e.printStackTrace();
+      }
+    }
+
+    public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
+      if (verbose) {
+        LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
+            + startCol + ", " + endCol + ")");
+      }
+
+      if (startCol >= endCol) {
+        return;
+      }
+
+      Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
+          rowKey).getBytes());
+      byte[] columnQualifier;
+      byte[] value;
+      for (long i = startCol; i < endCol; ++i) {
+        String qualStr = String.valueOf(i);
+        columnQualifier = qualStr.getBytes();
+        value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
+        put.add(columnFamily, columnQualifier, value);
+      }
+
+      try {
+        long start = System.currentTimeMillis();
+        table.put(put);
+        numCols.addAndGet(endCol - startCol);
+        totalOpTimeMs.addAndGet(
+            System.currentTimeMillis() - start);
+      } catch (IOException e) {
+        failedKeySet.add(rowKey);
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * A thread that keeps track of the highest key in the contiguous range of
+   * inserted keys.
+   */
+  private class InsertedKeysTracker implements Runnable {
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName(getClass().getSimpleName());
+      try {
+        long expectedKey = startKey;
+        Queue<Long> sortedKeys = new PriorityQueue<Long>();
+        while (expectedKey < endKey) {
+          // Block until a new element is available.
+          Long k;
+          try {
+            k = insertedKeys.poll(1, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            LOG.info("Inserted key tracker thread interrupted", e);
+            break;
+          }
+          if (k == null) {
+            continue;
+          }
+          if (k == expectedKey) {
+            // Skip the "sorted key" queue and consume this key.
+            insertedUpToKey.set(k);
+            ++expectedKey;
+          } else {
+            sortedKeys.add(k);
+          }
+
+          // See if we have a sequence of contiguous keys lined up.
+          while (!sortedKeys.isEmpty()
+              && ((k = sortedKeys.peek()) == expectedKey)) {
+            sortedKeys.poll();
+            insertedUpToKey.set(k);
+            ++expectedKey;
+          }
+
+          insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size());
+        }
+      } catch (Exception ex) {
+        LOG.error("Error in inserted key tracker", ex);
+      } finally {
+        numThreadsWorking.decrementAndGet();
+      }
+    }
+
+  }
+
+  @Override
+  public void waitForFinish() {
+    super.waitForFinish();
+    System.out.println("Failed to write keys: " + failedKeySet.size());
+    for (Long key : failedKeySet) {
+       System.out.println("Failed to write key: " + key);
+    }
+  }
+
+  public int getNumWriteFailures() {
+    return failedKeySet.size();
+  }
+
+  /**
+   * The max key until which all keys have been inserted (successfully or not).
+   * @return the last key that we have inserted all keys up to (inclusive)
+   */
+  public long insertedUpToKey() {
+    return insertedUpToKey.get();
+  }
+
+  public boolean failedToWriteKey(long k) {
+    return failedKeySet.contains(k);
+  }
+
+  @Override
+  protected String progressInfo() {
+    StringBuilder sb = new StringBuilder();
+    appendToStatus(sb, "insertedUpTo", insertedUpToKey.get());
+    appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get());
+    return sb.toString();
+  }
+
+  /**
+   * Used for a joint write/read workload. Enables tracking the last inserted
+   * key, which requires a blocking queue and a consumer thread.
+   * @param enable whether to enable tracking the last inserted key
+   */
+  void setTrackInsertedKeys(boolean enable) {
+    trackInsertedKeys = enable;
+  }
+
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java Thu May 17 22:23:29 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+public class TestLoadTestKVGenerator {
+
+  private static final int MIN_LEN = 10;
+  private static final int MAX_LEN = 20;
+
+  private Random rand = new Random(28937293L);
+  private LoadTestKVGenerator gen = new LoadTestKVGenerator(MIN_LEN, MAX_LEN);
+
+  @Test
+  public void testValueLength() {
+    for (int i = 0; i < 1000; ++i) {
+      byte[] v = gen.generateRandomSizeValue(i,
+          String.valueOf(rand.nextInt()));
+      assertTrue(MIN_LEN <= v.length);
+      assertTrue(v.length <= MAX_LEN);
+    }
+  }
+
+  @Test
+  public void testVerification() {
+    for (int i = 0; i < 1000; ++i) {
+      for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
+        String qual = String.valueOf(qualIndex);
+        byte[] v = gen.generateRandomSizeValue(i, qual);
+        String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
+        assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
+        v[0]++;
+        assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
+      }
+    }
+  }
+
+  @Test
+  public void testCorrectAndUniqueKeys() {
+    Set<String> keys = new HashSet<String>();
+    for (int i = 0; i < 1000; ++i) {
+      String k = LoadTestKVGenerator.md5PrefixedKey(i);
+      assertFalse("Already have key '" + k + "'", keys.contains(k));
+      assertTrue("Got '" + k + "', expected it to end with '-" + i + "'",
+        k.endsWith("-" + i));
+      keys.add(k);
+    }
+  }
+
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java Thu May 17 22:23:29 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * A write/read/verify load test on a mini HBase cluster. Tests reading
+ * and writing at the same time.
+ */
+@RunWith(Parameterized.class)
+public class TestMiniClusterLoadParallel
+    extends TestMiniClusterLoadSequential {
+
+  public TestMiniClusterLoadParallel(boolean isMultiPut) {
+    super(isMultiPut);
+  }
+
+  @Test(timeout=TIMEOUT_MS)
+  public void loadTest() throws Exception {
+    prepareForLoadTest();
+
+    readerThreads.linkToWriter(writerThreads);
+
+    writerThreads.start(0, numKeys, NUM_THREADS);
+    readerThreads.start(0, numKeys, NUM_THREADS);
+
+    writerThreads.waitForFinish();
+    readerThreads.waitForFinish();
+
+    assertEquals(0, writerThreads.getNumWriteFailures());
+    assertEquals(0, readerThreads.getNumReadFailures());
+    assertEquals(0, readerThreads.getNumReadErrors());
+    assertEquals(numKeys, readerThreads.getNumUniqueKeysVerified());
+  }
+
+}

Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java Thu May 17 22:23:29 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * A write/read/verify load test on a mini HBase cluster. Tests reading
+ * and then writing.
+ */
+@RunWith(Parameterized.class)
+public class TestMiniClusterLoadSequential {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestMiniClusterLoadSequential.class);
+
+  protected static final byte[] TABLE = Bytes.toBytes("load_test_tbl");
+  protected static final byte[] CF = Bytes.toBytes("load_test_cf");
+  protected static final int NUM_THREADS = 8;
+  protected static final int NUM_RS = 2;
+  protected static final int TIMEOUT_MS = 120000;
+  protected static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  protected final Configuration conf = TEST_UTIL.getConfiguration();
+  protected final boolean isMultiPut;
+
+  protected MultiThreadedWriter writerThreads;
+  protected MultiThreadedReader readerThreads;
+  protected int numKeys;
+
+  protected Compression.Algorithm compression = Compression.Algorithm.NONE;
+
+  public TestMiniClusterLoadSequential(boolean isMultiPut) {
+    this.isMultiPut = isMultiPut;
+    conf.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024);
+    // We don't want any region reassignments by the load balancer during the test.
+    conf.setFloat("hbase.regions.slop", 10.0f);
+  }
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    List<Object[]> parameters = new ArrayList<Object[]>();
+    for (boolean multiPut : new boolean[]{false, true}) {
+      parameters.add(new Object[]{multiPut});
+    }
+    return parameters;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.debug("Test setup: isMultiPut=" + isMultiPut);
+    TEST_UTIL.startMiniCluster(1, NUM_RS);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test(timeout=TIMEOUT_MS)
+  public void loadTest() throws Exception {
+    prepareForLoadTest();
+    runLoadTestOnExistingTable();
+  }
+
+  protected void runLoadTestOnExistingTable() throws IOException {
+    writerThreads.start(0, numKeys, NUM_THREADS);
+    writerThreads.waitForFinish();
+    assertEquals(0, writerThreads.getNumWriteFailures());
+
+    readerThreads.start(0, numKeys, NUM_THREADS);
+    readerThreads.waitForFinish();
+    assertEquals(0, readerThreads.getNumReadFailures());
+    assertEquals(0, readerThreads.getNumReadErrors());
+    assertEquals(numKeys, readerThreads.getNumKeysVerified());
+  }
+
+  protected void prepareForLoadTest() throws IOException {
+    LOG.info("Starting load test: isMultiPut=" + isMultiPut);
+    numKeys = numKeys();
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    while (admin.getClusterStatus().getServers().size() < NUM_RS) {
+      LOG.info("Sleeping until " + NUM_RS + " RSs are online");
+      Threads.sleepWithoutInterrupt(1000);
+    }
+    admin.close();
+
+    int numRegions = HBaseTestingUtility.createPreSplitLoadTestTable(conf,
+        TABLE, CF, compression);
+
+    TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
+
+    writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
+    writerThreads.setMultiPut(isMultiPut);
+    readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
+  }
+
+  protected int numKeys() {
+    return 10000;
+  }
+
+  protected HColumnDescriptor getColumnDesc(HBaseAdmin admin)
+      throws TableNotFoundException, IOException {
+    return admin.getTableDescriptor(TABLE).getFamily(CF);
+  }
+
+}