You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2012/05/18 00:23:29 UTC
svn commit: r1339890 - in /hbase/branches/0.92: ./
src/main/java/org/apache/hadoop/hbase/util/
src/test/java/org/apache/hadoop/hbase/
src/test/java/org/apache/hadoop/hbase/util/
Author: apurtell
Date: Thu May 17 22:23:29 2012
New Revision: 1339890
URL: http://svn.apache.org/viewvc?rev=1339890&view=rev
Log:
HBASE-5124. Backport LoadTestTool to 0.92
Added:
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
Modified:
hbase/branches/0.92/CHANGES.txt
hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java
hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
Modified: hbase/branches/0.92/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/CHANGES.txt?rev=1339890&r1=1339889&r2=1339890&view=diff
==============================================================================
--- hbase/branches/0.92/CHANGES.txt (original)
+++ hbase/branches/0.92/CHANGES.txt Thu May 17 22:23:29 2012
@@ -82,6 +82,7 @@ Release 0.92.2 - Unreleased
HBASE-5128 [uber hbck] Online automated repair of table integrity and region consistency problems
HBASE-5599 [hbck] handle NO_VERSION_FILE and SHOULD_NOT_BE_DEPLOYED inconsistencies (fulin wang)
HBASE-5719 Enhance hbck to sideline overlapped mega regions (Jimmy Xiang)
+ HBASE-5124 Backport LoadTestTool to 0.92 (Ted and Andrew)
Release 0.92.1 - March 17th, 2012
INCOMPATIBLE CHANGES
Added: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java (added)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java Thu May 17 22:23:29 2012
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Common base class used for HBase command-line tools. Simplifies workflow and
+ * command-line argument parsing.
+ */
+public abstract class AbstractHBaseTool implements Tool {
+
+ private static final int EXIT_SUCCESS = 0;
+ private static final int EXIT_FAILURE = 1;
+
+ private static final String HELP_OPTION = "help";
+
+ private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class);
+
+ private final Options options = new Options();
+
+ protected Configuration conf = null;
+
+ private static final Set<String> requiredOptions = new TreeSet<String>();
+
+ /**
+ * Override this to add command-line options using {@link #addOptWithArg}
+ * and similar methods.
+ */
+ protected abstract void addOptions();
+
+ /**
+ * This method is called to process the options after they have been parsed.
+ */
+ protected abstract void processOptions(CommandLine cmd);
+
+ /** The "main function" of the tool */
+ protected abstract void doWork() throws Exception;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public final int run(String[] args) throws Exception {
+ if (conf == null) {
+ LOG.error("Tool configuration is not initialized");
+ throw new NullPointerException("conf");
+ }
+
+ CommandLine cmd;
+ try {
+ // parse the command line arguments
+ cmd = parseArgs(args);
+ } catch (ParseException e) {
+ LOG.error("Error when parsing command-line arguemnts", e);
+ printUsage();
+ return EXIT_FAILURE;
+ }
+
+ if (cmd.hasOption(HELP_OPTION) || !sanityCheckOptions(cmd)) {
+ printUsage();
+ return EXIT_FAILURE;
+ }
+
+ processOptions(cmd);
+
+ try {
+ doWork();
+ } catch (Exception e) {
+ LOG.error("Error running command-line tool", e);
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+ }
+
+ private boolean sanityCheckOptions(CommandLine cmd) {
+ boolean success = true;
+ for (String reqOpt : requiredOptions) {
+ if (!cmd.hasOption(reqOpt)) {
+ LOG.error("Required option -" + reqOpt + " is missing");
+ success = false;
+ }
+ }
+ return success;
+ }
+
+ private CommandLine parseArgs(String[] args) throws ParseException {
+ options.addOption(HELP_OPTION, false, "Show usage");
+ addOptions();
+ CommandLineParser parser = new BasicParser();
+ return parser.parse(options, args);
+ }
+
+ private void printUsage() {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(80);
+ String usageHeader = "Options:";
+ String usageFooter = "";
+ String usageStr = "bin/hbase " + getClass().getName() + " <options>";
+
+ helpFormatter.printHelp(usageStr, usageHeader, options,
+ usageFooter);
+ }
+
+ protected void addRequiredOptWithArg(String opt, String description) {
+ requiredOptions.add(opt);
+ addOptWithArg(opt, description);
+ }
+
+ protected void addOptNoArg(String opt, String description) {
+ options.addOption(opt, false, description);
+ }
+
+ protected void addOptWithArg(String opt, String description) {
+ options.addOption(opt, true, description);
+ }
+
+ /**
+ * Parse a number and enforce a range.
+ */
+ public static long parseLong(String s, long minValue, long maxValue) {
+ long l = Long.parseLong(s);
+ if (l < minValue || l > maxValue) {
+ throw new IllegalArgumentException("The value " + l
+ + " is out of range [" + minValue + ", " + maxValue + "]");
+ }
+ return l;
+ }
+
+ public static int parseInt(String s, int minValue, int maxValue) {
+ return (int) parseLong(s, minValue, maxValue);
+ }
+
+ /** Call this from the concrete tool class's main function. */
+ protected void doStaticMain(String args[]) {
+ int ret;
+ try {
+ ret = ToolRunner.run(HBaseConfiguration.create(), this, args);
+ } catch (Exception ex) {
+ LOG.error("Error running command-line tool", ex);
+ ret = EXIT_FAILURE;
+ }
+ System.exit(ret);
+ }
+
+}
Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1339890&r1=1339889&r2=1339890&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/util/Threads.java Thu May 17 22:23:29 2012
@@ -151,4 +151,27 @@ public class Threads {
}
};
}
+
+ /**
+ * Sleeps for the given amount of time even if interrupted. Preserves
+ * the interrupt status.
+ * @param msToWait the amount of time to sleep in milliseconds
+ */
+ public static void sleepWithoutInterrupt(final long msToWait) {
+ long timeMillis = System.currentTimeMillis();
+ long endTime = timeMillis + msToWait;
+ boolean interrupted = false;
+ while (timeMillis < endTime) {
+ try {
+ Thread.sleep(endTime - timeMillis);
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ }
+ timeMillis = System.currentTimeMillis();
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1339890&r1=1339889&r2=1339890&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu May 17 22:23:29 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
@@ -88,6 +90,12 @@ public class HBaseTestingUtility {
private Configuration conf;
private MiniZooKeeperCluster zkCluster = null;
/**
+ * The default number of regions per regionserver when creating a pre-split
+ * table.
+ */
+ private static int DEFAULT_REGIONS_PER_SERVER = 5;
+
+ /**
* Set if we were passed a zkCluster. If so, we won't shutdown zk as
* part of general shutdown.
*/
@@ -1596,4 +1604,48 @@ public class HBaseTestingUtility {
return zkw;
}
+ /**
+ * Creates a pre-split table for load testing. If the table already exists,
+ * logs a warning and continues.
+ * @return the number of regions the table was split into
+ */
+ public static int createPreSplitLoadTestTable(Configuration conf,
+ byte[] tableName, byte[] columnFamily, Algorithm compression) throws IOException {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
+ hcd.setCompressionType(compression);
+ desc.addFamily(hcd);
+
+ int totalNumberOfRegions = 0;
+ try {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+
+ // create a table a pre-splits regions.
+ // The number of splits is set as:
+ // region servers * regions per region server).
+ int numberOfServers = admin.getClusterStatus().getServers().size();
+ if (numberOfServers == 0) {
+ throw new IllegalStateException("No live regionservers");
+ }
+
+ totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER;
+ LOG.info("Number of live regionservers: " + numberOfServers + ", " +
+ "pre-splitting table into " + totalNumberOfRegions + " regions " +
+ "(default regions per server: " + DEFAULT_REGIONS_PER_SERVER + ")");
+
+ byte[][] splits = new RegionSplitter.MD5StringSplit().split(
+ totalNumberOfRegions);
+
+ admin.createTable(desc, splits);
+ admin.close();
+ } catch (MasterNotRunningException e) {
+ LOG.error("Master not running", e);
+ throw new IOException(e);
+ } catch (TableExistsException e) {
+ LOG.warn("Table " + Bytes.toStringBinary(tableName) +
+ " already exists, continuing");
+ }
+ return totalNumberOfRegions;
+ }
+
}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestKVGenerator.java Thu May 17 22:23:29 2012
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.Random;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.MD5Hash;
+
+/**
+ * A generator of random keys and values for load testing. Keys are generated
+ * by converting numeric indexes to strings and prefixing them with an MD5
+ * hash. Values are generated by selecting value size in the configured range
+ * and generating a pseudo-random sequence of bytes seeded by key, column
+ * qualifier, and value size.
+ * <p>
+ * Not thread-safe, so a separate instance is needed for every writer thread/
+ */
+public class LoadTestKVGenerator {
+
+ /** A random number generator for determining value size */
+ private Random randomForValueSize = new Random();
+
+ private final int minValueSize;
+ private final int maxValueSize;
+
+ public LoadTestKVGenerator(int minValueSize, int maxValueSize) {
+ if (minValueSize <= 0 || maxValueSize <= 0) {
+ throw new IllegalArgumentException("Invalid min/max value sizes: " +
+ minValueSize + ", " + maxValueSize);
+ }
+ this.minValueSize = minValueSize;
+ this.maxValueSize = maxValueSize;
+ }
+
+ /**
+ * Verifies that the given byte array is the same as what would be generated
+ * for the given row key and qualifier. We are assuming that the value size
+ * is correct, and only verify the actual bytes. However, if the min/max
+ * value sizes are set sufficiently high, an accidental match should be
+ * extremely improbable.
+ */
+ public static boolean verify(String rowKey, String qual, byte[] value) {
+ byte[] expectedData = getValueForRowColumn(rowKey, qual, value.length);
+ return Bytes.equals(expectedData, value);
+ }
+
+ /**
+ * Converts the given key to string, and prefixes it with the MD5 hash of
+ * the index's string representation.
+ */
+ public static String md5PrefixedKey(long key) {
+ String stringKey = Long.toString(key);
+ String md5hash = MD5Hash.getMD5AsHex(Bytes.toBytes(stringKey));
+
+ // flip the key to randomize
+ return md5hash + "-" + stringKey;
+ }
+
+ /**
+ * Generates a value for the given key index and column qualifier. Size is
+ * selected randomly in the configured range. The generated value depends
+ * only on the combination of the key, qualifier, and the selected value
+ * size. This allows to verify the actual value bytes when reading, as done
+ * in {@link #verify(String, String, byte[])}.
+ */
+ public byte[] generateRandomSizeValue(long key, String qual) {
+ String rowKey = md5PrefixedKey(key);
+ int dataSize = minValueSize + randomForValueSize.nextInt(
+ Math.abs(maxValueSize - minValueSize));
+ return getValueForRowColumn(rowKey, qual, dataSize);
+ }
+
+ /**
+ * Generates random bytes of the given size for the given row and column
+ * qualifier. The random seed is fully determined by these parameters.
+ */
+ private static byte[] getValueForRowColumn(String rowKey, String qual,
+ int dataSize) {
+ Random seededRandom = new Random(rowKey.hashCode() + qual.hashCode() +
+ dataSize);
+ byte[] randomBytes = new byte[dataSize];
+ seededRandom.nextBytes(randomBytes);
+ return randomBytes;
+ }
+
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Thu May 17 22:23:29 2012
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+/**
+ * A command-line utility that reads, writes, and verifies data. Unlike
+ * {@link PerformanceEvaluation}, this tool validates the data written,
+ * and supports simultaneously writing and reading the same set of keys.
+ */
+public class LoadTestTool extends AbstractHBaseTool {
+
+ private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
+
+ /** Table name for the test */
+ private byte[] tableName;
+
+ /** Table name to use of not overridden on the command line */
+ private static final String DEFAULT_TABLE_NAME = "cluster_test";
+
+ /** Column family used by the test */
+ static byte[] COLUMN_FAMILY = Bytes.toBytes("test_cf");
+
+ /** Column families used by the test */
+ static final byte[][] COLUMN_FAMILIES = { COLUMN_FAMILY };
+
+ /** The number of reader/writer threads if not specified */
+ private static final int DEFAULT_NUM_THREADS = 20;
+
+ /** Usage string for the load option */
+ private static final String OPT_USAGE_LOAD =
+ "<avg_cols_per_key>:<avg_data_size>" +
+ "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
+ /** Usa\ge string for the read option */
+ private static final String OPT_USAGE_READ =
+ "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
+
+ private static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
+ Arrays.toString(StoreFile.BloomType.values());
+
+ private static final String OPT_USAGE_COMPRESSION = "Compression type, " +
+ "one of " + Arrays.toString(Compression.Algorithm.values());
+
+ private static final String OPT_BLOOM = "bloom";
+ private static final String OPT_COMPRESSION = "compression";
+
+ private static final String OPT_KEY_WINDOW = "key_window";
+ private static final String OPT_WRITE = "write";
+ private static final String OPT_MAX_READ_ERRORS = "max_read_errors";
+ private static final String OPT_MULTIPUT = "multiput";
+ private static final String OPT_NUM_KEYS = "num_keys";
+ private static final String OPT_READ = "read";
+ private static final String OPT_START_KEY = "start_key";
+ private static final String OPT_TABLE_NAME = "tn";
+ private static final String OPT_ZK_QUORUM = "zk";
+
+ private static final long DEFAULT_START_KEY = 0;
+
+ /** This will be removed as we factor out the dependency on command line */
+ private CommandLine cmd;
+
+ private MultiThreadedWriter writerThreads = null;
+ private MultiThreadedReader readerThreads = null;
+
+ private long startKey, endKey;
+
+ private boolean isWrite, isRead;
+
+ // Column family options
+ private Compression.Algorithm compressAlgo;
+ private StoreFile.BloomType bloomType;
+
+ // Writer options
+ private int numWriterThreads = DEFAULT_NUM_THREADS;
+ private long minColsPerKey, maxColsPerKey;
+ private int minColDataSize, maxColDataSize;
+ private boolean isMultiPut;
+
+ // Reader options
+ private int numReaderThreads = DEFAULT_NUM_THREADS;
+ private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
+ private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
+ private int verifyPercent;
+
+ private String[] splitColonSeparated(String option,
+ int minNumCols, int maxNumCols) {
+ String optVal = cmd.getOptionValue(option);
+ String[] cols = optVal.split(":");
+ if (cols.length < minNumCols || cols.length > maxNumCols) {
+ throw new IllegalArgumentException("Expected at least "
+ + minNumCols + " columns but no more than " + maxNumCols +
+ " in the colon-separated value '" + optVal + "' of the " +
+ "-" + option + " option");
+ }
+ return cols;
+ }
+
+ private int getNumThreads(String numThreadsStr) {
+ return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
+ }
+
+ /**
+ * Apply column family options such as Bloom filters, compression, and data
+ * block encoding.
+ */
+ private void applyColumnFamilyOptions(byte[] tableName,
+ byte[][] columnFamilies) throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
+ LOG.info("Disabling table " + Bytes.toString(tableName));
+ admin.disableTable(tableName);
+ for (byte[] cf : columnFamilies) {
+ HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
+ if (bloomType != null) {
+ columnDesc.setBloomFilterType(bloomType);
+ }
+ if (compressAlgo != null) {
+ columnDesc.setCompressionType(compressAlgo);
+ }
+ admin.modifyColumn(tableName, columnDesc);
+ }
+ LOG.info("Enabling table " + Bytes.toString(tableName));
+ admin.enableTable(tableName);
+ }
+
+ @Override
+ protected void addOptions() {
+ addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
+ "without port numbers");
+ addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
+ addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
+ addOptWithArg(OPT_READ, OPT_USAGE_READ);
+ addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
+ addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
+ addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
+ "to tolerate before terminating all reader threads. The default is " +
+ MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
+ addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
+ "reads and writes for concurrent write/read workload. The default " +
+ "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
+
+ addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
+ "separate puts for every column in a row");
+
+ addRequiredOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
+ addOptWithArg(OPT_START_KEY, "The first key to read/write " +
+ "(a 0-based index). The default value is " +
+ DEFAULT_START_KEY + ".");
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ this.cmd = cmd;
+
+ tableName = Bytes.toBytes(cmd.getOptionValue(OPT_TABLE_NAME,
+ DEFAULT_TABLE_NAME));
+ startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
+ String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
+ long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
+ Long.MAX_VALUE - startKey);
+ endKey = startKey + numKeys;
+
+ isWrite = cmd.hasOption(OPT_WRITE);
+ isRead = cmd.hasOption(OPT_READ);
+
+ if (!isWrite && !isRead) {
+ throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
+ "-" + OPT_READ + " has to be specified");
+ }
+
+ parseColumnFamilyOptions(cmd);
+
+ if (isWrite) {
+ String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
+
+ int colIndex = 0;
+ minColsPerKey = 1;
+ maxColsPerKey = 2 * Long.parseLong(writeOpts[colIndex++]);
+ int avgColDataSize =
+ parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
+ minColDataSize = avgColDataSize / 2;
+ maxColDataSize = avgColDataSize * 3 / 2;
+
+ if (colIndex < writeOpts.length) {
+ numWriterThreads = getNumThreads(writeOpts[colIndex++]);
+ }
+
+ isMultiPut = cmd.hasOption(OPT_MULTIPUT);
+
+ System.out.println("Multi-puts: " + isMultiPut);
+ System.out.println("Columns per key: " + minColsPerKey + ".."
+ + maxColsPerKey);
+ System.out.println("Data size per column: " + minColDataSize + ".."
+ + maxColDataSize);
+ }
+
+ if (isRead) {
+ String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
+ int colIndex = 0;
+ verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
+ if (colIndex < readOpts.length) {
+ numReaderThreads = getNumThreads(readOpts[colIndex++]);
+ }
+
+ if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
+ maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
+ 0, Integer.MAX_VALUE);
+ }
+
+ if (cmd.hasOption(OPT_KEY_WINDOW)) {
+ keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
+ 0, Integer.MAX_VALUE);
+ }
+
+ System.out.println("Percent of keys to verify: " + verifyPercent);
+ System.out.println("Reader threads: " + numReaderThreads);
+ }
+
+ System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
+ }
+
+ private void parseColumnFamilyOptions(CommandLine cmd) {
+ String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
+ compressAlgo = compressStr == null ? null :
+ Compression.Algorithm.valueOf(compressStr);
+
+ String bloomStr = cmd.getOptionValue(OPT_BLOOM);
+ bloomType = bloomStr == null ? null :
+ StoreFile.BloomType.valueOf(bloomStr);
+ }
+
+ @Override
+ protected void doWork() throws IOException {
+ if (cmd.hasOption(OPT_ZK_QUORUM)) {
+ conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
+ }
+
+ HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
+ COLUMN_FAMILY, compressAlgo);
+ applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
+
+ if (isWrite) {
+ writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);
+ writerThreads.setMultiPut(isMultiPut);
+ writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
+ writerThreads.setDataSize(minColDataSize, maxColDataSize);
+ }
+
+ if (isRead) {
+ readerThreads = new MultiThreadedReader(conf, tableName, COLUMN_FAMILY,
+ verifyPercent);
+ readerThreads.setMaxErrors(maxReadErrors);
+ readerThreads.setKeyWindow(keyWindow);
+ }
+
+ if (isRead && isWrite) {
+ LOG.info("Concurrent read/write workload: making readers aware of the " +
+ "write point");
+ readerThreads.linkToWriter(writerThreads);
+ }
+
+ if (isWrite) {
+ System.out.println("Starting to write data...");
+ writerThreads.start(startKey, endKey, numWriterThreads);
+ }
+
+ if (isRead) {
+ System.out.println("Starting to read data...");
+ readerThreads.start(startKey, endKey, numReaderThreads);
+ }
+
+ if (isWrite) {
+ writerThreads.waitForFinish();
+ }
+
+ if (isRead) {
+ readerThreads.waitForFinish();
+ }
+ }
+
+ public static void main(String[] args) {
+ new LoadTestTool().doStaticMain(args);
+ }
+
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedAction.java Thu May 17 22:23:29 2012
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Common base class for reader and writer parts of multi-thread HBase load
+ * test ({@link LoadTestTool}).
+ */
+public abstract class MultiThreadedAction {
+ private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
+
+ protected final byte[] tableName;
+ protected final byte[] columnFamily;
+ protected final Configuration conf;
+
+ protected int numThreads = 1;
+
+ /** The start key of the key range, inclusive */
+ protected long startKey = 0;
+
+ /** The end key of the key range, exclusive */
+ protected long endKey = 1;
+
+ protected AtomicInteger numThreadsWorking = new AtomicInteger();
+ protected AtomicLong numKeys = new AtomicLong();
+ protected AtomicLong numCols = new AtomicLong();
+ protected AtomicLong totalOpTimeMs = new AtomicLong();
+ protected boolean verbose = false;
+
+ protected int minDataSize = 256;
+ protected int maxDataSize = 1024;
+
+ /** "R" or "W" */
+ private String actionLetter;
+
+ /** Whether we need to print out Hadoop Streaming-style counters */
+ private boolean streamingCounters;
+
+ public static final int REPORTING_INTERVAL_MS = 5000;
+
+ public MultiThreadedAction(Configuration conf, byte[] tableName,
+ byte[] columnFamily, String actionLetter) {
+ this.conf = conf;
+ this.tableName = tableName;
+ this.columnFamily = columnFamily;
+ this.actionLetter = actionLetter;
+ }
+
+ public void start(long startKey, long endKey, int numThreads)
+ throws IOException {
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.numThreads = numThreads;
+ (new Thread(new ProgressReporter(actionLetter))).start();
+ }
+
+ private static String formatTime(long elapsedTime) {
+ String format = String.format("%%0%dd", 2);
+ elapsedTime = elapsedTime / 1000;
+ String seconds = String.format(format, elapsedTime % 60);
+ String minutes = String.format(format, (elapsedTime % 3600) / 60);
+ String hours = String.format(format, elapsedTime / 3600);
+ String time = hours + ":" + minutes + ":" + seconds;
+ return time;
+ }
+
+ /** Asynchronously reports progress */
+ private class ProgressReporter implements Runnable {
+
+ private String reporterId = "";
+
+ public ProgressReporter(String id) {
+ this.reporterId = id;
+ }
+
+ @Override
+ public void run() {
+ long startTime = System.currentTimeMillis();
+ long priorNumKeys = 0;
+ long priorCumulativeOpTime = 0;
+ int priorAverageKeysPerSecond = 0;
+
+ // Give other threads time to start.
+ Threads.sleep(REPORTING_INTERVAL_MS);
+
+ while (numThreadsWorking.get() != 0) {
+ String threadsLeft =
+ "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
+ if (numKeys.get() == 0) {
+ LOG.info(threadsLeft + "Number of keys = 0");
+ } else {
+ long numKeys = MultiThreadedAction.this.numKeys.get();
+ long time = System.currentTimeMillis() - startTime;
+ long totalOpTime = totalOpTimeMs.get();
+
+ long numKeysDelta = numKeys - priorNumKeys;
+ long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
+
+ double averageKeysPerSecond =
+ (time > 0) ? (numKeys * 1000 / time) : 0;
+
+ LOG.info(threadsLeft
+ + "Keys="
+ + numKeys
+ + ", cols="
+ + StringUtils.humanReadableInt(numCols.get())
+ + ", time="
+ + formatTime(time)
+ + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
+ + numKeys * 1000 / time + ", latency=" + totalOpTime
+ / numKeys + " ms]") : "")
+ + ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
+ + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
+ + totalOpTimeDelta / numKeysDelta + " ms]") : "")
+ + progressInfo());
+
+ if (streamingCounters) {
+ printStreamingCounters(numKeysDelta,
+ averageKeysPerSecond - priorAverageKeysPerSecond);
+ }
+
+ priorNumKeys = numKeys;
+ priorCumulativeOpTime = totalOpTime;
+ priorAverageKeysPerSecond = (int) averageKeysPerSecond;
+ }
+
+ Threads.sleep(REPORTING_INTERVAL_MS);
+ }
+ }
+
+ private void printStreamingCounters(long numKeysDelta,
+ double avgKeysPerSecondDelta) {
+ // Write stats in a format that can be interpreted as counters by
+ // streaming map-reduce jobs.
+ System.err.println("reporter:counter:numKeys," + reporterId + ","
+ + numKeysDelta);
+ System.err.println("reporter:counter:numCols," + reporterId + ","
+ + numCols.get());
+ System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
+ + "," + (long) (avgKeysPerSecondDelta));
+ }
+ }
+
+ public void setDataSize(int minDataSize, int maxDataSize) {
+ this.minDataSize = minDataSize;
+ this.maxDataSize = maxDataSize;
+ }
+
+ public void waitForFinish() {
+ while (numThreadsWorking.get() != 0) {
+ Threads.sleepWithoutInterrupt(1000);
+ }
+ }
+
+ protected void startThreads(Collection<? extends Thread> threads) {
+ numThreadsWorking.addAndGet(threads.size());
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ }
+
+ /** @return the end key of the key range, exclusive */
+ public long getEndKey() {
+ return endKey;
+ }
+
+ /** Returns a task-specific progress string */
+ protected abstract String progressInfo();
+
+ protected static void appendToStatus(StringBuilder sb, String desc,
+ long v) {
+ if (v == 0) {
+ return;
+ }
+ sb.append(", ");
+ sb.append(desc);
+ sb.append("=");
+ sb.append(v);
+ }
+
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java Thu May 17 22:23:29 2012
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+
+/** Creates multiple threads that read and verify previously written data */
+public class MultiThreadedReader extends MultiThreadedAction
+{
+ private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
+
+ private Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
+ private final double verifyPercent;
+ private volatile boolean aborted;
+
+ private MultiThreadedWriter writer = null;
+
+ /**
+ * The number of keys verified in a sequence. This will never be larger than
+ * the total number of keys in the range. The reader might also verify
+ * random keys when it catches up with the writer.
+ */
+ private final AtomicLong numUniqueKeysVerified = new AtomicLong();
+
+ /**
+ * Default maximum number of read errors to tolerate before shutting down all
+ * readers.
+ */
+ public static final int DEFAULT_MAX_ERRORS = 10;
+
+ /**
+ * Default "window" size between the last key written by the writer and the
+ * key that we attempt to read. The lower this number, the stricter our
+ * testing is. If this is zero, we always attempt to read the highest key
+ * in the contiguous sequence of keys written by the writers.
+ */
+ public static final int DEFAULT_KEY_WINDOW = 0;
+
+ protected AtomicLong numKeysVerified = new AtomicLong(0);
+ private AtomicLong numReadErrors = new AtomicLong(0);
+ private AtomicLong numReadFailures = new AtomicLong(0);
+
+ private int maxErrors = DEFAULT_MAX_ERRORS;
+ private int keyWindow = DEFAULT_KEY_WINDOW;
+
+ public MultiThreadedReader(Configuration conf, byte[] tableName,
+ byte[] columnFamily, double verifyPercent) {
+ super(conf, tableName, columnFamily, "R");
+ this.verifyPercent = verifyPercent;
+ }
+
+ public void linkToWriter(MultiThreadedWriter writer) {
+ this.writer = writer;
+ writer.setTrackInsertedKeys(true);
+ }
+
+ public void setMaxErrors(int maxErrors) {
+ this.maxErrors = maxErrors;
+ }
+
+ public void setKeyWindow(int keyWindow) {
+ this.keyWindow = keyWindow;
+ }
+
+ @Override
+ public void start(long startKey, long endKey, int numThreads)
+ throws IOException {
+ super.start(startKey, endKey, numThreads);
+ if (verbose) {
+ LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
+ }
+
+ for (int i = 0; i < numThreads; ++i) {
+ HBaseReaderThread reader = new HBaseReaderThread(i);
+ readers.add(reader);
+ }
+ startThreads(readers);
+ }
+
+ public class HBaseReaderThread extends Thread {
+ private final int readerId;
+ private final HTable table;
+ private final Random random = new Random();
+
+ /** The "current" key being read. Increases from startKey to endKey. */
+ private long curKey;
+
+ /** Time when the thread started */
+ private long startTimeMs;
+
+ /** If we are ahead of the writer and reading a random key. */
+ private boolean readingRandomKey;
+
+ /**
+ * @param readerId only the keys with this remainder from division by
+ * {@link #numThreads} will be read by this thread
+ */
+ public HBaseReaderThread(int readerId) throws IOException {
+ this.readerId = readerId;
+ table = new HTable(conf, tableName);
+ setName(getClass().getSimpleName() + "_" + readerId);
+ }
+
+ @Override
+ public void run() {
+ try {
+ runReader();
+ } finally {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Error closing table", e);
+ }
+ numThreadsWorking.decrementAndGet();
+ }
+ }
+
+ private void runReader() {
+ if (verbose) {
+ LOG.info("Started thread #" + readerId + " for reads...");
+ }
+
+ startTimeMs = System.currentTimeMillis();
+ curKey = startKey;
+ while (curKey < endKey && !aborted) {
+ long k = getNextKeyToRead();
+
+ // A sanity check for the key range.
+ if (k < startKey || k >= endKey) {
+ numReadErrors.incrementAndGet();
+ throw new AssertionError("Load tester logic error: proposed key " +
+ "to read " + k + " is out of range (startKey=" + startKey +
+ ", endKey=" + endKey + ")");
+ }
+
+ if (k % numThreads != readerId ||
+ writer != null && writer.failedToWriteKey(k)) {
+ // Skip keys that this thread should not read, as well as the keys
+ // that we know the writer failed to write.
+ continue;
+ }
+
+ readKey(k);
+ if (k == curKey - 1 && !readingRandomKey) {
+ // We have verified another unique key.
+ numUniqueKeysVerified.incrementAndGet();
+ }
+ }
+ }
+
+ /**
+ * Should only be used for the concurrent writer/reader workload. The
+ * maximum key we are allowed to read, subject to the "key window"
+ * constraint.
+ */
+ private long maxKeyWeCanRead() {
+ long insertedUpToKey = writer.insertedUpToKey();
+ if (insertedUpToKey >= endKey - 1) {
+ // The writer has finished writing our range, so we can read any
+ // key in the range.
+ return endKey - 1;
+ }
+ return Math.min(endKey - 1, writer.insertedUpToKey() - keyWindow);
+ }
+
+ private long getNextKeyToRead() {
+ readingRandomKey = false;
+ if (writer == null || curKey <= maxKeyWeCanRead()) {
+ return curKey++;
+ }
+
+ // We caught up with the writer. See if we can read any keys at all.
+ long maxKeyToRead;
+ while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
+ // The writer has not written sufficient keys for us to be able to read
+ // anything at all. Sleep a bit. This should only happen in the
+ // beginning of a load test run.
+ Threads.sleepWithoutInterrupt(50);
+ }
+
+ if (curKey <= maxKeyToRead) {
+ // The writer wrote some keys, and we are now allowed to read our
+ // current key.
+ return curKey++;
+ }
+
+ // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
+ // Don't increment the current key -- we still have to try reading it
+ // later. Set a flag to make sure that we don't count this key towards
+ // the set of unique keys we have verified.
+ readingRandomKey = true;
+ return startKey + Math.abs(random.nextLong())
+ % (maxKeyToRead - startKey + 1);
+ }
+
+ private Get readKey(long keyToRead) {
+ Get get = new Get(
+ LoadTestKVGenerator.md5PrefixedKey(keyToRead).getBytes());
+ get.addFamily(columnFamily);
+
+ try {
+ if (verbose) {
+ LOG.info("[" + readerId + "] " + "Querying key " + keyToRead
+ + ", cf " + Bytes.toStringBinary(columnFamily));
+ }
+ queryKey(get, random.nextInt(100) < verifyPercent);
+ } catch (IOException e) {
+ numReadFailures.addAndGet(1);
+ LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
+ + ", time from start: "
+ + (System.currentTimeMillis() - startTimeMs) + " ms");
+ }
+ return get;
+ }
+
+ public void queryKey(Get get, boolean verify) throws IOException {
+ String rowKey = new String(get.getRow());
+
+ // read the data
+ long start = System.currentTimeMillis();
+ Result result = table.get(get);
+ totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+ numKeys.addAndGet(1);
+
+ // if we got no data report error
+ if (result.isEmpty()) {
+ HRegionLocation hloc = table.getRegionLocation(
+ Bytes.toBytes(rowKey));
+ LOG.info("Key = " + rowKey + ", RegionServer: "
+ + hloc.getHostname());
+ numReadErrors.addAndGet(1);
+ LOG.error("No data returned, tried to get actions for key = "
+ + rowKey + (writer == null ? "" : ", keys inserted by writer: " +
+ writer.numKeys.get() + ")"));
+
+ if (numReadErrors.get() > maxErrors) {
+ LOG.error("Aborting readers -- found more than " + maxErrors
+ + " errors\n");
+ aborted = true;
+ }
+ }
+
+ if (result.getFamilyMap(columnFamily) != null) {
+ // increment number of columns read
+ numCols.addAndGet(result.getFamilyMap(columnFamily).size());
+
+ if (verify) {
+ // verify the result
+ List<KeyValue> keyValues = result.list();
+ for (KeyValue kv : keyValues) {
+ String qual = new String(kv.getQualifier());
+
+ // if something does not look right report it
+ if (!LoadTestKVGenerator.verify(rowKey, qual, kv.getValue())) {
+ numReadErrors.addAndGet(1);
+ LOG.error("Error checking data for key = " + rowKey
+ + ", actionId = " + qual);
+ }
+ }
+ numKeysVerified.addAndGet(1);
+ }
+ }
+ }
+
+ }
+
+ public long getNumReadFailures() {
+ return numReadFailures.get();
+ }
+
+ public long getNumReadErrors() {
+ return numReadErrors.get();
+ }
+
+ public long getNumKeysVerified() {
+ return numKeysVerified.get();
+ }
+
+ public long getNumUniqueKeysVerified() {
+ return numUniqueKeysVerified.get();
+ }
+
+ @Override
+ protected String progressInfo() {
+ StringBuilder sb = new StringBuilder();
+ appendToStatus(sb, "verified", numKeysVerified.get());
+ appendToStatus(sb, "READ FAILURES", numReadFailures.get());
+ appendToStatus(sb, "READ ERRORS", numReadErrors.get());
+ return sb.toString();
+ }
+
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Thu May 17 22:23:29 2012
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+
+/** Creates multiple threads that write key/values into the */
+public class MultiThreadedWriter extends MultiThreadedAction {
+ private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
+
+ private long minColumnsPerKey = 1;
+ private long maxColumnsPerKey = 10;
+ private Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
+
+ private boolean isMultiPut = false;
+
+ /**
+ * A temporary place to keep track of inserted keys. This is written to by
+ * all writers and is drained on a separate thread that populates
+ * {@link #insertedUpToKey}, the maximum key in the contiguous range of keys
+ * being inserted. This queue is supposed to stay small.
+ */
+ private BlockingQueue<Long> insertedKeys =
+ new ArrayBlockingQueue<Long>(10000);
+
+ /**
+ * This is the current key to be inserted by any thread. Each thread does an
+ * atomic get and increment operation and inserts the current value.
+ */
+ private AtomicLong nextKeyToInsert = new AtomicLong();
+
+ /**
+ * The highest key in the contiguous range of keys .
+ */
+ private AtomicLong insertedUpToKey = new AtomicLong();
+
+ /** The sorted set of keys NOT inserted by the writers */
+ private Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
+
+ /**
+ * The total size of the temporary inserted key set that have not yet lined
+ * up in a our contiguous sequence starting from startKey. Supposed to stay
+ * small.
+ */
+ private AtomicLong insertedKeyQueueSize = new AtomicLong();
+
+ /** Enable this if used in conjunction with a concurrent reader. */
+ private boolean trackInsertedKeys;
+
+ public MultiThreadedWriter(Configuration conf, byte[] tableName,
+ byte[] columnFamily) {
+ super(conf, tableName, columnFamily, "W");
+ }
+
+ /** Use multi-puts vs. separate puts for every column in a row */
+ public void setMultiPut(boolean isMultiPut) {
+ this.isMultiPut = isMultiPut;
+ }
+
+ public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
+ this.minColumnsPerKey = minColumnsPerKey;
+ this.maxColumnsPerKey = maxColumnsPerKey;
+ }
+
+ @Override
+ public void start(long startKey, long endKey, int numThreads)
+ throws IOException {
+ super.start(startKey, endKey, numThreads);
+
+ if (verbose) {
+ LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
+ }
+
+ nextKeyToInsert.set(startKey);
+ insertedUpToKey.set(startKey - 1);
+
+ for (int i = 0; i < numThreads; ++i) {
+ HBaseWriterThread writer = new HBaseWriterThread(i);
+ writers.add(writer);
+ }
+
+ if (trackInsertedKeys) {
+ new Thread(new InsertedKeysTracker()).start();
+ numThreadsWorking.incrementAndGet();
+ }
+
+ startThreads(writers);
+ }
+
+ public static byte[] longToByteArrayKey(long rowKey) {
+ return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
+ }
+
+ private class HBaseWriterThread extends Thread {
+ private final HTable table;
+ private final int writerId;
+
+ private final Random random = new Random();
+ private final LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(
+ minDataSize, maxDataSize);
+
+ public HBaseWriterThread(int writerId) throws IOException {
+ setName(getClass().getSimpleName() + "_" + writerId);
+ table = new HTable(conf, tableName);
+ this.writerId = writerId;
+ }
+
+ public void run() {
+ try {
+ long rowKey;
+ while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
+ long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
+ % (maxColumnsPerKey - minColumnsPerKey);
+ numKeys.addAndGet(1);
+ if (isMultiPut) {
+ multiPutInsertKey(rowKey, 0, numColumns);
+ } else {
+ for (long col = 0; col < numColumns; ++col) {
+ insert(rowKey, col);
+ }
+ }
+ if (trackInsertedKeys) {
+ insertedKeys.add(rowKey);
+ }
+ }
+ } finally {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Error closing table", e);
+ }
+ numThreadsWorking.decrementAndGet();
+ }
+ }
+
+ public void insert(long rowKey, long col) {
+ Put put = new Put(longToByteArrayKey(rowKey));
+ String colAsStr = String.valueOf(col);
+ put.add(columnFamily, colAsStr.getBytes(),
+ dataGenerator.generateRandomSizeValue(rowKey, colAsStr));
+ try {
+ long start = System.currentTimeMillis();
+ table.put(put);
+ numCols.addAndGet(1);
+ totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
+ } catch (IOException e) {
+ failedKeySet.add(rowKey);
+ LOG.error("Failed to insert: " + rowKey);
+ e.printStackTrace();
+ }
+ }
+
+ public void multiPutInsertKey(long rowKey, long startCol, long endCol) {
+ if (verbose) {
+ LOG.debug("Preparing put for key = " + rowKey + ", cols = ["
+ + startCol + ", " + endCol + ")");
+ }
+
+ if (startCol >= endCol) {
+ return;
+ }
+
+ Put put = new Put(LoadTestKVGenerator.md5PrefixedKey(
+ rowKey).getBytes());
+ byte[] columnQualifier;
+ byte[] value;
+ for (long i = startCol; i < endCol; ++i) {
+ String qualStr = String.valueOf(i);
+ columnQualifier = qualStr.getBytes();
+ value = dataGenerator.generateRandomSizeValue(rowKey, qualStr);
+ put.add(columnFamily, columnQualifier, value);
+ }
+
+ try {
+ long start = System.currentTimeMillis();
+ table.put(put);
+ numCols.addAndGet(endCol - startCol);
+ totalOpTimeMs.addAndGet(
+ System.currentTimeMillis() - start);
+ } catch (IOException e) {
+ failedKeySet.add(rowKey);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * A thread that keeps track of the highest key in the contiguous range of
+ * inserted keys.
+ */
+ private class InsertedKeysTracker implements Runnable {
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(getClass().getSimpleName());
+ try {
+ long expectedKey = startKey;
+ Queue<Long> sortedKeys = new PriorityQueue<Long>();
+ while (expectedKey < endKey) {
+ // Block until a new element is available.
+ Long k;
+ try {
+ k = insertedKeys.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.info("Inserted key tracker thread interrupted", e);
+ break;
+ }
+ if (k == null) {
+ continue;
+ }
+ if (k == expectedKey) {
+ // Skip the "sorted key" queue and consume this key.
+ insertedUpToKey.set(k);
+ ++expectedKey;
+ } else {
+ sortedKeys.add(k);
+ }
+
+ // See if we have a sequence of contiguous keys lined up.
+ while (!sortedKeys.isEmpty()
+ && ((k = sortedKeys.peek()) == expectedKey)) {
+ sortedKeys.poll();
+ insertedUpToKey.set(k);
+ ++expectedKey;
+ }
+
+ insertedKeyQueueSize.set(insertedKeys.size() + sortedKeys.size());
+ }
+ } catch (Exception ex) {
+ LOG.error("Error in inserted key tracker", ex);
+ } finally {
+ numThreadsWorking.decrementAndGet();
+ }
+ }
+
+ }
+
+ @Override
+ public void waitForFinish() {
+ super.waitForFinish();
+ System.out.println("Failed to write keys: " + failedKeySet.size());
+ for (Long key : failedKeySet) {
+ System.out.println("Failed to write key: " + key);
+ }
+ }
+
+ public int getNumWriteFailures() {
+ return failedKeySet.size();
+ }
+
+ /**
+ * The max key until which all keys have been inserted (successfully or not).
+ * @return the last key that we have inserted all keys up to (inclusive)
+ */
+ public long insertedUpToKey() {
+ return insertedUpToKey.get();
+ }
+
+ public boolean failedToWriteKey(long k) {
+ return failedKeySet.contains(k);
+ }
+
+ @Override
+ protected String progressInfo() {
+ StringBuilder sb = new StringBuilder();
+ appendToStatus(sb, "insertedUpTo", insertedUpToKey.get());
+ appendToStatus(sb, "insertedQSize", insertedKeyQueueSize.get());
+ return sb.toString();
+ }
+
+ /**
+ * Used for a joint write/read workload. Enables tracking the last inserted
+ * key, which requires a blocking queue and a consumer thread.
+ * @param enable whether to enable tracking the last inserted key
+ */
+ void setTrackInsertedKeys(boolean enable) {
+ trackInsertedKeys = enable;
+ }
+
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestLoadTestKVGenerator.java Thu May 17 22:23:29 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+public class TestLoadTestKVGenerator {
+
+ private static final int MIN_LEN = 10;
+ private static final int MAX_LEN = 20;
+
+ private Random rand = new Random(28937293L);
+ private LoadTestKVGenerator gen = new LoadTestKVGenerator(MIN_LEN, MAX_LEN);
+
+ @Test
+ public void testValueLength() {
+ for (int i = 0; i < 1000; ++i) {
+ byte[] v = gen.generateRandomSizeValue(i,
+ String.valueOf(rand.nextInt()));
+ assertTrue(MIN_LEN <= v.length);
+ assertTrue(v.length <= MAX_LEN);
+ }
+ }
+
+ @Test
+ public void testVerification() {
+ for (int i = 0; i < 1000; ++i) {
+ for (int qualIndex = 0; qualIndex < 20; ++qualIndex) {
+ String qual = String.valueOf(qualIndex);
+ byte[] v = gen.generateRandomSizeValue(i, qual);
+ String rowKey = LoadTestKVGenerator.md5PrefixedKey(i);
+ assertTrue(LoadTestKVGenerator.verify(rowKey, qual, v));
+ v[0]++;
+ assertFalse(LoadTestKVGenerator.verify(rowKey, qual, v));
+ }
+ }
+ }
+
+ @Test
+ public void testCorrectAndUniqueKeys() {
+ Set<String> keys = new HashSet<String>();
+ for (int i = 0; i < 1000; ++i) {
+ String k = LoadTestKVGenerator.md5PrefixedKey(i);
+ assertFalse("Already have key '" + k + "'", keys.contains(k));
+ assertTrue("Got '" + k + "', expected it to end with '-" + i + "'",
+ k.endsWith("-" + i));
+ keys.add(k);
+ }
+ }
+
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadParallel.java Thu May 17 22:23:29 2012
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * A write/read/verify load test on a mini HBase cluster. Tests reading
+ * and writing at the same time.
+ */
+@RunWith(Parameterized.class)
+public class TestMiniClusterLoadParallel
+ extends TestMiniClusterLoadSequential {
+
+ public TestMiniClusterLoadParallel(boolean isMultiPut) {
+ super(isMultiPut);
+ }
+
+ @Test(timeout=TIMEOUT_MS)
+ public void loadTest() throws Exception {
+ prepareForLoadTest();
+
+ readerThreads.linkToWriter(writerThreads);
+
+ writerThreads.start(0, numKeys, NUM_THREADS);
+ readerThreads.start(0, numKeys, NUM_THREADS);
+
+ writerThreads.waitForFinish();
+ readerThreads.waitForFinish();
+
+ assertEquals(0, writerThreads.getNumWriteFailures());
+ assertEquals(0, readerThreads.getNumReadFailures());
+ assertEquals(0, readerThreads.getNumReadErrors());
+ assertEquals(numKeys, readerThreads.getNumUniqueKeysVerified());
+ }
+
+}
Added: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java?rev=1339890&view=auto
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java (added)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/util/TestMiniClusterLoadSequential.java Thu May 17 22:23:29 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * A write/read/verify load test on a mini HBase cluster. Tests reading
+ * and then writing.
+ */
+@RunWith(Parameterized.class)
+public class TestMiniClusterLoadSequential {
+
+ private static final Log LOG = LogFactory.getLog(
+ TestMiniClusterLoadSequential.class);
+
+ protected static final byte[] TABLE = Bytes.toBytes("load_test_tbl");
+ protected static final byte[] CF = Bytes.toBytes("load_test_cf");
+ protected static final int NUM_THREADS = 8;
+ protected static final int NUM_RS = 2;
+ protected static final int TIMEOUT_MS = 120000;
+ protected static final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ protected final Configuration conf = TEST_UTIL.getConfiguration();
+ protected final boolean isMultiPut;
+
+ protected MultiThreadedWriter writerThreads;
+ protected MultiThreadedReader readerThreads;
+ protected int numKeys;
+
+ protected Compression.Algorithm compression = Compression.Algorithm.NONE;
+
+ public TestMiniClusterLoadSequential(boolean isMultiPut) {
+ this.isMultiPut = isMultiPut;
+ conf.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024);
+ // We don't want any region reassignments by the load balancer during the test.
+ conf.setFloat("hbase.regions.slop", 10.0f);
+ }
+
+ @Parameters
+ public static Collection<Object[]> parameters() {
+ List<Object[]> parameters = new ArrayList<Object[]>();
+ for (boolean multiPut : new boolean[]{false, true}) {
+ parameters.add(new Object[]{multiPut});
+ }
+ return parameters;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.debug("Test setup: isMultiPut=" + isMultiPut);
+ TEST_UTIL.startMiniCluster(1, NUM_RS);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout=TIMEOUT_MS)
+ public void loadTest() throws Exception {
+ prepareForLoadTest();
+ runLoadTestOnExistingTable();
+ }
+
+ protected void runLoadTestOnExistingTable() throws IOException {
+ writerThreads.start(0, numKeys, NUM_THREADS);
+ writerThreads.waitForFinish();
+ assertEquals(0, writerThreads.getNumWriteFailures());
+
+ readerThreads.start(0, numKeys, NUM_THREADS);
+ readerThreads.waitForFinish();
+ assertEquals(0, readerThreads.getNumReadFailures());
+ assertEquals(0, readerThreads.getNumReadErrors());
+ assertEquals(numKeys, readerThreads.getNumKeysVerified());
+ }
+
+ protected void prepareForLoadTest() throws IOException {
+ LOG.info("Starting load test: isMultiPut=" + isMultiPut);
+ numKeys = numKeys();
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ while (admin.getClusterStatus().getServers().size() < NUM_RS) {
+ LOG.info("Sleeping until " + NUM_RS + " RSs are online");
+ Threads.sleepWithoutInterrupt(1000);
+ }
+ admin.close();
+
+ int numRegions = HBaseTestingUtility.createPreSplitLoadTestTable(conf,
+ TABLE, CF, compression);
+
+ TEST_UTIL.waitUntilAllRegionsAssigned(numRegions);
+
+ writerThreads = new MultiThreadedWriter(conf, TABLE, CF);
+ writerThreads.setMultiPut(isMultiPut);
+ readerThreads = new MultiThreadedReader(conf, TABLE, CF, 100);
+ }
+
+ protected int numKeys() {
+ return 10000;
+ }
+
+ protected HColumnDescriptor getColumnDesc(HBaseAdmin admin)
+ throws TableNotFoundException, IOException {
+ return admin.getTableDescriptor(TABLE).getFamily(CF);
+ }
+
+}