You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2014/04/08 20:18:41 UTC
svn commit: r1585807 - in /hbase/branches/hbase-10070:
hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Author: ndimiduk
Date: Tue Apr 8 18:18:41 2014
New Revision: 1585807
URL: http://svn.apache.org/r1585807
Log:
HBASE-10791 Add integration test to demonstrate performance improvement
Added:
hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
Modified:
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Added: hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java?rev=1585807&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java (added)
+++ hbase/branches/hbase-10070/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java Tue Apr 8 18:18:41 2014
@@ -0,0 +1,338 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import com.google.common.base.Objects;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
+import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+import org.apache.hadoop.hbase.chaos.policies.Policy;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static java.lang.String.format;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for comparing the performance impact of region replicas. Uses
+ * components of {@link PerformanceEvaluation}. Does not run from
+ * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
+ * with the JUnit runner. Hence no @Test annotations either. See {@code -help}
+ * for full list of options.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
+
+ private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
+
+ private static final String SLEEP_TIME_KEY = "sleeptime";
+ // short default interval because tests don't run very long.
+ private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
+ private static final String TABLE_NAME_KEY = "tableName";
+ private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
+ private static final String NOMAPRED_KEY = "nomapred";
+ private static final boolean NOMAPRED_DEFAULT = false;
+ private static final String REPLICA_COUNT_KEY = "replicas";
+ private static final String REPLICA_COUNT_DEFAULT = "" + 3;
+ private static final String PRIMARY_TIMEOUT_KEY = "timeout";
+ private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
+ private static final String NUM_RS_KEY = "numRs";
+ private static final String NUM_RS_DEFAULT = "" + 3;
+
+ private TableName tableName;
+ private long sleepTime;
+ private boolean nomapred = NOMAPRED_DEFAULT;
+ private int replicaCount;
+ private int primaryTimeout;
+ private int clusterSize;
+
+ /**
+ * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
+ */
+ static class PerfEvalCallable implements Callable<TimingResult> {
+ private final Queue<String> argv = new LinkedList<String>();
+ private final HBaseAdmin admin;
+
+ public PerfEvalCallable(HBaseAdmin admin, String argv) {
+ // TODO: this API is awkward, should take HConnection, not HBaseAdmin
+ this.admin = admin;
+ this.argv.addAll(Arrays.asList(argv.split(" ")));
+ LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
+ }
+
+ @Override
+ public TimingResult call() throws Exception {
+ PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
+ PerformanceEvaluation.checkTable(admin, opts);
+ long numRows = opts.totalRows;
+ long elapsedTime;
+ if (opts.nomapred) {
+ elapsedTime = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
+ } else {
+ Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
+ Counters counters = job.getCounters();
+ numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
+ elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
+ }
+ return new TimingResult(numRows, elapsedTime);
+ }
+ }
+
+ /**
+ * Record the results from a single {@link PerformanceEvaluation} job run.
+ */
+ static class TimingResult {
+ public long numRows;
+ public long elapsedTime;
+
+ public TimingResult(long numRows, long elapsedTime) {
+ this.numRows = numRows;
+ this.elapsedTime = elapsedTime;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("numRows", numRows)
+ .add("elapsedTime", elapsedTime)
+ .toString();
+ }
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = util.getConfiguration();
+
+ // sanity check cluster
+ // TODO: this should reach out to master and verify online state instead
+ assertEquals("Master must be configured with StochasticLoadBalancer",
+ "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
+ conf.get("hbase.master.loadbalancer.class"));
+ // TODO: this should reach out to master and verify online state instead
+ assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
+ conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
+
+ // enable client-side settings
+ conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true);
+ // TODO: expose these settings to CLI override
+ conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
+ conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
+ }
+
+ @Override
+ public void setUpCluster() throws Exception {
+ util = getTestingUtil(getConf());
+ util.initializeCluster(clusterSize);
+ }
+
+ @Override
+ public void setUpMonkey() throws Exception {
+ Policy p = new PeriodicRandomActionPolicy(sleepTime,
+ new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString()),
+ new MoveRandomRegionOfTableAction(tableName.getNameAsString()));
+ this.monkey = new PolicyBasedChaosMonkey(util, p);
+ // don't start monkey right away
+ }
+
+ @Override
+ protected void addOptions() {
+ addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
+ + TABLE_NAME_DEFAULT + "'");
+ addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
+ + SLEEP_TIME_DEFAULT);
+ addOptNoArg(NOMAPRED_KEY,
+ "Run multiple clients using threads (rather than use mapreduce)");
+ addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
+ + REPLICA_COUNT_DEFAULT);
+ addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
+ + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
+ addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
+ + NUM_RS_DEFAULT);
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
+ sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
+ nomapred = cmd.hasOption(NOMAPRED_KEY);
+ replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
+ primaryTimeout =
+ Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
+ clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
+ LOG.debug(Objects.toStringHelper("Parsed Options")
+ .add(TABLE_NAME_KEY, tableName)
+ .add(SLEEP_TIME_KEY, sleepTime)
+ .add(NOMAPRED_KEY, nomapred)
+ .add(REPLICA_COUNT_KEY, replicaCount)
+ .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
+ .add(NUM_RS_KEY, clusterSize)
+ .toString());
+ }
+
+ @Override
+ public int runTestFromCommandLine() throws Exception {
+ test();
+ return 0;
+ }
+
+ @Override
+ public String getTablename() {
+ return tableName.getNameAsString();
+ }
+
+ @Override
+ protected Set<String> getColumnFamilies() {
+ return null;
+ }
+
+ /**
+ * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
+ */
+ private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception {
+ admin.modifyTable(desc.getTableName(), desc);
+ Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
+ setFirst(0);
+ setSecond(0);
+ }};
+ for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds
+ status = admin.getAlterStatus(desc.getTableName());
+ if (status.getSecond() != 0) {
+ LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
+ + " regions updated.");
+ Thread.sleep(1 * 1000l);
+ } else {
+ LOG.debug("All regions updated.");
+ }
+ }
+ if (status.getSecond() != 0) {
+ throw new Exception("Failed to update replica count after 500 seconds.");
+ }
+ }
+
+ /**
+ * Set the number of Region replicas.
+ */
+ private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount)
+ throws Exception {
+ admin.disableTable(table);
+ HTableDescriptor desc = admin.getTableDescriptor(table);
+ desc.setRegionReplication(replicaCount);
+ modifyTableSync(admin, desc);
+ admin.enableTable(table);
+ }
+
+ public void test() throws Exception {
+ int maxIters = 3;
+ String mr = nomapred ? "--nomapred" : "";
+ String replicas = "--replicas=" + replicaCount;
+ // TODO: splits disabled until "phase 2" is complete.
+ String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
+ String writeOpts = format("%s %s --table=%s --presplit=16 sequentialWrite 4",
+ mr, splitPolicy, tableName);
+ String readOpts =
+ format("%s --table=%s --latency --sampleRate=0.1 randomRead 4", mr, tableName);
+ String replicaReadOpts = format("%s %s", replicas, readOpts);
+
+ ArrayList<TimingResult> resultsWithoutReplica = new ArrayList<TimingResult>(maxIters);
+ ArrayList<TimingResult> resultsWithReplica = new ArrayList<TimingResult>(maxIters);
+
+ // create/populate the table, replicas disabled
+ LOG.debug("Populating table.");
+ new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
+
+ // one last sanity check, then send in the clowns!
+ assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
+ DisabledRegionSplitPolicy.class.getName(),
+ util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
+ startMonkey();
+
+ // collect a baseline without region replicas.
+ for (int i = 0; i < maxIters; i++) {
+ LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
+ resultsWithoutReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
+ // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
+ Thread.sleep(5000l);
+ }
+
+ // disable monkey, enable region replicas, enable monkey
+ cleanUpMonkey("Altering table.");
+ LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
+ setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
+ setUpMonkey();
+ startMonkey();
+
+ // run test with region replicas.
+ for (int i = 0; i < maxIters; i++) {
+ LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
+ resultsWithReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
+ // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
+ Thread.sleep(5000l);
+ }
+
+ DescriptiveStatistics withoutReplicaStats = new DescriptiveStatistics();
+ for (TimingResult tr : resultsWithoutReplica) {
+ withoutReplicaStats.addValue(tr.elapsedTime);
+ }
+ DescriptiveStatistics withReplicaStats = new DescriptiveStatistics();
+ for (TimingResult tr : resultsWithReplica) {
+ withReplicaStats.addValue(tr.elapsedTime);
+ }
+
+ LOG.info(Objects.toStringHelper("testName")
+ .add("withoutReplicas", resultsWithoutReplica)
+ .add("withReplicas", resultsWithReplica)
+ .add("withoutReplicasMean", withoutReplicaStats.getMean())
+ .add("withReplicasMean", withReplicaStats.getMean())
+ .toString());
+
+ assertTrue(
+ "Running with region replicas under chaos should be as fast or faster than without. "
+ + "withReplicas.mean: " + withReplicaStats.getMean() + "ms "
+ + "withoutReplicas.mean: " + withoutReplicaStats.getMean() + "ms.",
+ withReplicaStats.getMean() <= withoutReplicaStats.getMean());
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
+ System.exit(status);
+ }
+}
Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1585807&r1=1585806&r2=1585807&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Tue Apr 8 18:18:41 2014
@@ -28,7 +28,9 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Queue;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@@ -37,6 +39,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +48,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -86,16 +90,17 @@ import static org.codehaus.jackson.map.S
* client that steps through one of a set of hardcoded tests or 'experiments'
* (e.g. a random reads test, a random writes test, etc.). Pass on the
* command-line which test to run and how many clients are participating in
- * this experiment. Run <code>java PerformanceEvaluation --help</code> to
- * obtain usage.
+ * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
*
* <p>This class sets up and runs the evaluation programs described in
* Section 7, <i>Performance Evaluation</i>, of the <a
* href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
* paper, pages 8-10.
*
- * <p>If number of clients > 1, we start up a MapReduce job. Each map task
- * runs an individual client. Each client does about 1GB of data.
+ * <p>By default, runs as a mapreduce job where each mapper runs a single test
+ * client. Can also run as a non-mapreduce, multithreaded application by
+ * specifying {@code --nomapred}. Each client does about 1GB of data, unless
+ * specified otherwise.
*/
public class PerformanceEvaluation extends Configured implements Tool {
protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
@@ -116,10 +121,35 @@ public class PerformanceEvaluation exten
private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
private static final TestOptions DEFAULT_OPTS = new TestOptions();
- protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
-
+ private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
+ static {
+ addCommandDescriptor(RandomReadTest.class, "randomRead",
+ "Run random read test");
+ addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
+ "Run random seek and scan 100 test");
+ addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
+ "Run random seek scan with both start and stop row (max 10 rows)");
+ addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
+ "Run random seek scan with both start and stop row (max 100 rows)");
+ addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
+ "Run random seek scan with both start and stop row (max 1000 rows)");
+ addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
+ "Run random seek scan with both start and stop row (max 10000 rows)");
+ addCommandDescriptor(RandomWriteTest.class, "randomWrite",
+ "Run random write test");
+ addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
+ "Run sequential read test");
+ addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
+ "Run sequential write test");
+ addCommandDescriptor(ScanTest.class, "scan",
+ "Run scan test (read every row)");
+ addCommandDescriptor(FilteredScanTest.class, "filterScan",
+ "Run scan test using a filter to find a specific row based on it's value " +
+ "(make sure to use --rows=20)");
+ }
+
/**
* Enum for map metrics. Keep it out here rather than inside in the Map
* inner-class so we can find associated properties.
@@ -137,37 +167,12 @@ public class PerformanceEvaluation exten
*/
public PerformanceEvaluation(final Configuration conf) {
super(conf);
-
- addCommandDescriptor(RandomReadTest.class, "randomRead",
- "Run random read test");
- addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
- "Run random seek and scan 100 test");
- addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
- "Run random seek scan with both start and stop row (max 10 rows)");
- addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
- "Run random seek scan with both start and stop row (max 100 rows)");
- addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
- "Run random seek scan with both start and stop row (max 1000 rows)");
- addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
- "Run random seek scan with both start and stop row (max 10000 rows)");
- addCommandDescriptor(RandomWriteTest.class, "randomWrite",
- "Run random write test");
- addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
- "Run sequential read test");
- addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
- "Run sequential write test");
- addCommandDescriptor(ScanTest.class, "scan",
- "Run scan test (read every row)");
- addCommandDescriptor(FilteredScanTest.class, "filterScan",
- "Run scan test using a filter to find a specific row based on it's value " +
- "(make sure to use --rows=20)");
}
- protected void addCommandDescriptor(Class<? extends Test> cmdClass,
+ protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
String name, String description) {
- CmdDescriptor cmdDescriptor =
- new CmdDescriptor(cmdClass, name, description);
- commands.put(name, cmdDescriptor);
+ CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
+ COMMANDS.put(name, cmdDescriptor);
}
/**
@@ -245,35 +250,62 @@ public class PerformanceEvaluation exten
}
/*
- * If table does not already exist, create.
- * @param c Client to use checking.
- * @return True if we created the table.
- * @throws IOException
+ * If table does not already exist, create. Also create a table when
+ * {@code opts.presplitRegions} is specified or when the existing table's
+ * region replica count doesn't match {@code opts.replicas}.
*/
- private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
- HTableDescriptor tableDescriptor = getTableDescriptor(opts);
- if (opts.presplitRegions > 0) {
- // presplit requested
- if (admin.tableExists(tableDescriptor.getTableName())) {
- admin.disableTable(tableDescriptor.getTableName());
- admin.deleteTable(tableDescriptor.getTableName());
- }
-
- byte[][] splits = getSplits(opts);
- for (int i=0; i < splits.length; i++) {
- LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
- }
- admin.createTable(tableDescriptor, splits);
- LOG.info ("Table created with " + opts.presplitRegions + " splits");
- }
- else {
- boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
- if (!tableExists) {
- admin.createTable(tableDescriptor);
- LOG.info("Table " + tableDescriptor + " created");
+ static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
+ TableName tableName = TableName.valueOf(opts.tableName);
+ boolean needsDelete = false, exists = admin.tableExists(tableName);
+ boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
+ || opts.cmdName.toLowerCase().contains("scan");
+ if (!exists && isReadCmd) {
+ throw new IllegalStateException(
+ "Must specify an existing table for read commands. Run a write command first.");
+ }
+ HTableDescriptor desc =
+ exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
+ byte[][] splits = getSplits(opts);
+
+ // recreate the table when user has requested presplit or when existing
+ // {RegionSplitPolicy,replica count} does not match requested.
+ if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
+ || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)
+ || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) {
+ needsDelete = true;
+ // wait, why did it delete my table?!?
+ LOG.debug(Objects.toStringHelper("needsDelete")
+ .add("needsDelete", needsDelete)
+ .add("isReadCmd", isReadCmd)
+ .add("exists", exists)
+ .add("desc", desc)
+ .add("presplit", opts.presplitRegions)
+ .add("splitPolicy", opts.splitPolicy)
+ .add("replicas", opts.replicas));
+ }
+
+ // remove an existing table
+ if (needsDelete) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+
+ // table creation is necessary
+ if (!exists || needsDelete) {
+ desc = getTableDescriptor(opts);
+ if (splits != null) {
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < splits.length; i++) {
+ LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+ }
+ }
}
+ admin.createTable(desc, splits);
+ LOG.info("Table " + desc + " created");
}
- return admin.tableExists(tableDescriptor.getTableName());
+ return admin.tableExists(tableName);
}
/**
@@ -288,6 +320,12 @@ public class PerformanceEvaluation exten
family.setInMemory(true);
}
desc.addFamily(family);
+ if (opts.replicas != DEFAULT_OPTS.replicas) {
+ desc.setRegionReplication(opts.replicas);
+ }
+ if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
+ desc.setRegionSplitPolicyClassName(opts.splitPolicy);
+ }
return desc;
}
@@ -295,8 +333,8 @@ public class PerformanceEvaluation exten
* generates splits based on total number of rows and specified split regions
*/
protected static byte[][] getSplits(TestOptions opts) {
- if (opts.presplitRegions == 0)
- return new byte [0][];
+ if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
+ return null;
int numSplitPoints = opts.presplitRegions - 1;
byte[][] splits = new byte[numSplitPoints][];
@@ -313,8 +351,10 @@ public class PerformanceEvaluation exten
* @param cmd Command to run.
* @throws IOException
*/
- private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
+ static long doLocalClients(final TestOptions opts, final Configuration conf)
throws IOException, InterruptedException {
+ final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+ assert cmd != null;
Future<Long>[] threads = new Future[opts.numClientThreads];
long[] timings = new long[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
@@ -326,7 +366,7 @@ public class PerformanceEvaluation exten
public Long call() throws Exception {
TestOptions threadOpts = new TestOptions(opts);
threadOpts.startRow = index * threadOpts.perClientRunRows;
- long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
+ long elapsedTime = runOneClient(cmd, conf, threadOpts, new Status() {
public void setStatus(final String msg) throws IOException {
LOG.info("client-" + Thread.currentThread().getName() + " " + msg);
}
@@ -354,9 +394,10 @@ public class PerformanceEvaluation exten
total += timings[i];
}
LOG.info("[" + test + "]"
- + "\tMin: " + timings[0] + "ms"
- + "\tMax: " + timings[timings.length - 1] + "ms"
- + "\tAvg: " + (total / timings.length) + "ms");
+ + "\tMin: " + timings[0] + "ms"
+ + "\tMax: " + timings[timings.length - 1] + "ms"
+ + "\tAvg: " + (total / timings.length) + "ms");
+ return total;
}
/*
@@ -366,15 +407,16 @@ public class PerformanceEvaluation exten
* @param cmd Command to run.
* @throws IOException
*/
- private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
- InterruptedException, ClassNotFoundException {
- Configuration conf = getConf();
+ static Job doMapReduce(TestOptions opts, final Configuration conf)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+ assert cmd != null;
Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
- conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
+ conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
Job job = new Job(conf);
job.setJarByClass(PerformanceEvaluation.class);
- job.setJobName("HBase Performance Evaluation");
+ job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setInputPaths(job, inputDir);
@@ -400,6 +442,7 @@ public class PerformanceEvaluation exten
TableMapReduceUtil.initCredentials(job);
job.waitForCompletion(true);
+ return job;
}
/*
@@ -408,7 +451,7 @@ public class PerformanceEvaluation exten
* @return Directory that contains file written.
* @throws IOException
*/
- private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
+ private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
Path inputDir = new Path(jobdir, "inputs");
@@ -480,6 +523,7 @@ public class PerformanceEvaluation exten
public TestOptions() {}
public TestOptions(TestOptions that) {
+ this.cmdName = that.cmdName;
this.nomapred = that.nomapred;
this.startRow = that.startRow;
this.perClientRunRows = that.perClientRunRows;
@@ -495,10 +539,13 @@ public class PerformanceEvaluation exten
this.multiGet = that.multiGet;
this.inMemoryCF = that.inMemoryCF;
this.presplitRegions = that.presplitRegions;
+ this.replicas = that.replicas;
+ this.splitPolicy = that.splitPolicy;
this.compression = that.compression;
this.blockEncoding = that.blockEncoding;
}
+ public String cmdName = null;
public boolean nomapred = false;
public int startRow = 0;
public int perClientRunRows = ROWS_PER_GB;
@@ -512,8 +559,10 @@ public class PerformanceEvaluation exten
public int noOfTags = 1;
public boolean reportLatency = false;
public int multiGet = 0;
- boolean inMemoryCF = false;
- int presplitRegions = 0;
+ public boolean inMemoryCF = false;
+ public int presplitRegions = 0;
+ public int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
+ public String splitPolicy = null;
public Compression.Algorithm compression = Compression.Algorithm.NONE;
public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
}
@@ -721,12 +770,14 @@ public class PerformanceEvaluation exten
static class RandomReadTest extends Test {
private final int everyN;
private final double[] times;
+ private final Consistency consistency;
private ArrayList<Get> gets;
int idx = 0;
RandomReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
+ consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
if (opts.multiGet > 0) {
LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
@@ -744,6 +795,7 @@ public class PerformanceEvaluation exten
if (i % everyN == 0) {
Get get = new Get(getRandomRow(this.rand, opts.totalRows));
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ get.setConsistency(consistency);
if (opts.multiGet > 0) {
this.gets.add(get);
if (this.gets.size() == opts.multiGet) {
@@ -1033,9 +1085,9 @@ public class PerformanceEvaluation exten
if (admin != null) admin.close();
}
if (opts.nomapred) {
- doLocalClients(cmd, opts);
+ doLocalClients(opts, getConf());
} else {
- doMapReduce(cmd, opts);
+ doMapReduce(opts, getConf());
}
}
@@ -1074,6 +1126,10 @@ public class PerformanceEvaluation exten
"This works only if usetags is true.");
System.err.println(" latency Set to report operation latencies. " +
"Currently only supported by randomRead test. Default: False");
+ System.err.println(" multiGet Batch gets together into groups of N. Only supported " +
+ "by randomRead. Default: disabled");
+ System.err.println(" replicas Enable region replica testing. Defaults: 1.");
+ System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: ");
@@ -1081,7 +1137,7 @@ public class PerformanceEvaluation exten
System.err.println(" -Dmapreduce.task.timeout=60000");
System.err.println();
System.err.println("Command:");
- for (CmdDescriptor command : commands.values()) {
+ for (CmdDescriptor command : COMMANDS.values()) {
System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
}
System.err.println();
@@ -1095,140 +1151,161 @@ public class PerformanceEvaluation exten
+ " sequentialWrite 1");
}
- private static int getNumClients(final int start, final String[] args) {
- if(start + 1 > args.length) {
- throw new IllegalArgumentException("must supply the number of clients");
- }
- int N = Integer.parseInt(args[start]);
- if (N < 1) {
- throw new IllegalArgumentException("Number of clients must be > 1");
- }
- return N;
- }
+ /**
+ * Parse options passed in via an arguments array. Assumes that array has been split
+ * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
+ * in the queue at the conclusion of this method call. It's up to the caller to deal
+ * with these unrecognized arguments.
+ */
+ static TestOptions parseOpts(Queue<String> args) {
+ TestOptions opts = new TestOptions();
- public int run(String[] args) throws Exception {
- // Process command-line args. TODO: Better cmd-line processing
- // (but hopefully something not as painful as cli options).
- int errCode = -1;
- if (args.length < 1) {
- printUsage();
- return errCode;
- }
+ String cmd = null;
+ while ((cmd = args.poll()) != null) {
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ // place item back onto queue so that caller knows parsing was incomplete
+ args.add(cmd);
+ break;
+ }
- try {
- // MR-NOTE: if you are adding a property that is used to control an operation
- // like put(), get(), scan(), ... you must also add it as part of the MR
- // input, take a look at writeInputFile().
- // Then you must adapt the LINE_PATTERN input regex,
- // and parse the argument, take a look at PEInputFormat.getSplits().
-
- TestOptions opts = new TestOptions();
-
- for (int i = 0; i < args.length; i++) {
- String cmd = args[i];
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
- printUsage();
- errCode = 0;
- break;
- }
+ final String nmr = "--nomapred";
+ if (cmd.startsWith(nmr)) {
+ opts.nomapred = true;
+ continue;
+ }
- final String nmr = "--nomapred";
- if (cmd.startsWith(nmr)) {
- opts.nomapred = true;
- continue;
- }
+ final String rows = "--rows=";
+ if (cmd.startsWith(rows)) {
+ opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
+ continue;
+ }
- final String rows = "--rows=";
- if (cmd.startsWith(rows)) {
- opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
- continue;
- }
+ final String sampleRate = "--sampleRate=";
+ if (cmd.startsWith(sampleRate)) {
+ opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+ continue;
+ }
- final String sampleRate = "--sampleRate=";
- if (cmd.startsWith(sampleRate)) {
- opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
- continue;
- }
+ final String table = "--table=";
+ if (cmd.startsWith(table)) {
+ opts.tableName = cmd.substring(table.length());
+ continue;
+ }
- final String table = "--table=";
- if (cmd.startsWith(table)) {
- opts.tableName = cmd.substring(table.length());
- continue;
- }
+ final String compress = "--compress=";
+ if (cmd.startsWith(compress)) {
+ opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+ continue;
+ }
- final String compress = "--compress=";
- if (cmd.startsWith(compress)) {
- opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
- continue;
- }
+ final String blockEncoding = "--blockEncoding=";
+ if (cmd.startsWith(blockEncoding)) {
+ opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+ continue;
+ }
- final String blockEncoding = "--blockEncoding=";
- if (cmd.startsWith(blockEncoding)) {
- opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
- continue;
- }
+ final String flushCommits = "--flushCommits=";
+ if (cmd.startsWith(flushCommits)) {
+ opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+ continue;
+ }
- final String flushCommits = "--flushCommits=";
- if (cmd.startsWith(flushCommits)) {
- opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
- continue;
- }
+ final String writeToWAL = "--writeToWAL=";
+ if (cmd.startsWith(writeToWAL)) {
+ opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+ continue;
+ }
- final String writeToWAL = "--writeToWAL=";
- if (cmd.startsWith(writeToWAL)) {
- opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
- continue;
- }
+ final String presplit = "--presplit=";
+ if (cmd.startsWith(presplit)) {
+ opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+ continue;
+ }
- final String presplit = "--presplit=";
- if (cmd.startsWith(presplit)) {
- opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
- continue;
- }
-
- final String inMemory = "--inmemory=";
- if (cmd.startsWith(inMemory)) {
- opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
- continue;
- }
+ final String inMemory = "--inmemory=";
+ if (cmd.startsWith(inMemory)) {
+ opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+ continue;
+ }
- final String latency = "--latency";
- if (cmd.startsWith(latency)) {
- opts.reportLatency = true;
- continue;
- }
+ final String latency = "--latency";
+ if (cmd.startsWith(latency)) {
+ opts.reportLatency = true;
+ continue;
+ }
- final String multiGet = "--multiGet=";
- if (cmd.startsWith(multiGet)) {
- opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
- continue;
- }
+ final String multiGet = "--multiGet=";
+ if (cmd.startsWith(multiGet)) {
+ opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+ continue;
+ }
- final String useTags = "--usetags=";
- if (cmd.startsWith(useTags)) {
- opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
- continue;
- }
+ final String useTags = "--usetags=";
+ if (cmd.startsWith(useTags)) {
+ opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+ continue;
+ }
- final String noOfTags = "--nooftags=";
- if (cmd.startsWith(noOfTags)) {
- opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
- continue;
- }
+ final String noOfTags = "--nooftags=";
+ if (cmd.startsWith(noOfTags)) {
+ opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+ continue;
+ }
- Class<? extends Test> cmdClass = determineCommandClass(cmd);
- if (cmdClass != null) {
- opts.numClientThreads = getNumClients(i + 1, args);
- // number of rows specified
- opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
- runTest(cmdClass, opts);
- errCode = 0;
- break;
- }
+ final String replicas = "--replicas=";
+ if (cmd.startsWith(replicas)) {
+ opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
+ continue;
+ }
- printUsage();
+ final String splitPolicy = "--splitPolicy=";
+ if (cmd.startsWith(splitPolicy)) {
+ opts.splitPolicy = cmd.substring(splitPolicy.length());
+ continue;
+ }
+
+ if (isCommandClass(cmd)) {
+ opts.cmdName = cmd;
+ opts.numClientThreads = Integer.parseInt(args.remove());
+ // number of rows specified
+ opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
break;
}
+ }
+ return opts;
+ }
+
+ public int run(String[] args) throws Exception {
+ // Process command-line args. TODO: Better cmd-line processing
+ // (but hopefully something not as painful as cli options).
+ int errCode = -1;
+ if (args.length < 1) {
+ printUsage();
+ return errCode;
+ }
+
+ try {
+ LinkedList<String> argv = new LinkedList<String>();
+ argv.addAll(Arrays.asList(args));
+ TestOptions opts = parseOpts(argv);
+
+ // args remainting, print help and exit
+ if (!argv.isEmpty()) {
+ errCode = 0;
+ printUsage();
+ }
+
+ // must run at least 1 client
+ if (opts.numClientThreads <= 0) {
+ throw new IllegalArgumentException("Number of clients must be > 0");
+ }
+
+ Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
+ if (cmdClass != null) {
+ runTest(cmdClass, opts);
+ errCode = 0;
+ }
+
} catch (Exception e) {
e.printStackTrace();
}
@@ -1236,8 +1313,12 @@ public class PerformanceEvaluation exten
return errCode;
}
- private Class<? extends Test> determineCommandClass(String cmd) {
- CmdDescriptor descriptor = commands.get(cmd);
+ private static boolean isCommandClass(String cmd) {
+ return COMMANDS.containsKey(cmd);
+ }
+
+ private static Class<? extends Test> determineCommandClass(String cmd) {
+ CmdDescriptor descriptor = COMMANDS.get(cmd);
return descriptor != null ? descriptor.getCmdClass() : null;
}