You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:15 UTC
[29/49] git commit: HBASE-10791 Add integration test to demonstrate
performance improvement
HBASE-10791 Add integration test to demonstrate performance improvement
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1585807 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d313103a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d313103a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d313103a
Branch: refs/heads/master
Commit: d313103aeb66bd6a8081f76578a35e26d5a0fc78
Parents: 7d24752
Author: ndimiduk <nd...@unknown>
Authored: Tue Apr 8 18:18:41 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:39 2014 -0700
----------------------------------------------------------------------
.../hbase/IntegrationTestRegionReplicaPerf.java | 338 +++++++++++
.../hadoop/hbase/PerformanceEvaluation.java | 598 +++++++++++--------
2 files changed, 678 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d313103a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
new file mode 100644
index 0000000..ca3a8f0
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRegionReplicaPerf.java
@@ -0,0 +1,338 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import com.google.common.base.Objects;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
+import org.apache.hadoop.hbase.chaos.actions.RestartRsHoldingTableAction;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+import org.apache.hadoop.hbase.chaos.policies.Policy;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import static java.lang.String.format;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for comparing the performance impact of region replicas. Uses
+ * components of {@link PerformanceEvaluation}. Does not run from
+ * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
+ * with the JUnit runner. Hence no @Test annotations either. See {@code -help}
+ * for full list of options.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
+
+ private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
+
+ private static final String SLEEP_TIME_KEY = "sleeptime";
+ // short default interval because tests don't run very long.
+ private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
+ private static final String TABLE_NAME_KEY = "tableName";
+ private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
+ private static final String NOMAPRED_KEY = "nomapred";
+ private static final boolean NOMAPRED_DEFAULT = false;
+ private static final String REPLICA_COUNT_KEY = "replicas";
+ private static final String REPLICA_COUNT_DEFAULT = "" + 3;
+ private static final String PRIMARY_TIMEOUT_KEY = "timeout";
+ private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
+ private static final String NUM_RS_KEY = "numRs";
+ private static final String NUM_RS_DEFAULT = "" + 3;
+
+ private TableName tableName;
+ private long sleepTime;
+ private boolean nomapred = NOMAPRED_DEFAULT;
+ private int replicaCount;
+ private int primaryTimeout;
+ private int clusterSize;
+
+ /**
+ * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
+ */
+ static class PerfEvalCallable implements Callable<TimingResult> {
+ private final Queue<String> argv = new LinkedList<String>();
+ private final HBaseAdmin admin;
+
+ public PerfEvalCallable(HBaseAdmin admin, String argv) {
+ // TODO: this API is awkward, should take HConnection, not HBaseAdmin
+ this.admin = admin;
+ this.argv.addAll(Arrays.asList(argv.split(" ")));
+ LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
+ }
+
+ @Override
+ public TimingResult call() throws Exception {
+ PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
+ PerformanceEvaluation.checkTable(admin, opts);
+ long numRows = opts.totalRows;
+ long elapsedTime;
+ if (opts.nomapred) {
+ elapsedTime = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
+ } else {
+ Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
+ Counters counters = job.getCounters();
+ numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
+ elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
+ }
+ return new TimingResult(numRows, elapsedTime);
+ }
+ }
+
+ /**
+ * Record the results from a single {@link PerformanceEvaluation} job run.
+ */
+ static class TimingResult {
+ public long numRows;
+ public long elapsedTime;
+
+ public TimingResult(long numRows, long elapsedTime) {
+ this.numRows = numRows;
+ this.elapsedTime = elapsedTime;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("numRows", numRows)
+ .add("elapsedTime", elapsedTime)
+ .toString();
+ }
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = util.getConfiguration();
+
+ // sanity check cluster
+ // TODO: this should reach out to master and verify online state instead
+ assertEquals("Master must be configured with StochasticLoadBalancer",
+ "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
+ conf.get("hbase.master.loadbalancer.class"));
+ // TODO: this should reach out to master and verify online state instead
+ assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
+ conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
+
+ // enable client-side settings
+ conf.setBoolean(RpcClient.ALLOWS_INTERRUPTS, true);
+ // TODO: expose these settings to CLI override
+ conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
+ conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
+ }
+
+ @Override
+ public void setUpCluster() throws Exception {
+ util = getTestingUtil(getConf());
+ util.initializeCluster(clusterSize);
+ }
+
+ @Override
+ public void setUpMonkey() throws Exception {
+ Policy p = new PeriodicRandomActionPolicy(sleepTime,
+ new RestartRsHoldingTableAction(sleepTime, tableName.getNameAsString()),
+ new MoveRandomRegionOfTableAction(tableName.getNameAsString()));
+ this.monkey = new PolicyBasedChaosMonkey(util, p);
+ // don't start monkey right away
+ }
+
+ @Override
+ protected void addOptions() {
+ addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
+ + TABLE_NAME_DEFAULT + "'");
+ addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
+ + SLEEP_TIME_DEFAULT);
+ addOptNoArg(NOMAPRED_KEY,
+ "Run multiple clients using threads (rather than use mapreduce)");
+ addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
+ + REPLICA_COUNT_DEFAULT);
+ addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
+ + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
+ addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
+ + NUM_RS_DEFAULT);
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
+ sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
+ nomapred = cmd.hasOption(NOMAPRED_KEY);
+ replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
+ primaryTimeout =
+ Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
+ clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
+ LOG.debug(Objects.toStringHelper("Parsed Options")
+ .add(TABLE_NAME_KEY, tableName)
+ .add(SLEEP_TIME_KEY, sleepTime)
+ .add(NOMAPRED_KEY, nomapred)
+ .add(REPLICA_COUNT_KEY, replicaCount)
+ .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
+ .add(NUM_RS_KEY, clusterSize)
+ .toString());
+ }
+
+ @Override
+ public int runTestFromCommandLine() throws Exception {
+ test();
+ return 0;
+ }
+
+ @Override
+ public String getTablename() {
+ return tableName.getNameAsString();
+ }
+
+ @Override
+ protected Set<String> getColumnFamilies() {
+ return null;
+ }
+
+ /**
+ * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
+ */
+ private static void modifyTableSync(HBaseAdmin admin, HTableDescriptor desc) throws Exception {
+ admin.modifyTable(desc.getTableName(), desc);
+ Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
+ setFirst(0);
+ setSecond(0);
+ }};
+ for (int i = 0; status.getFirst() != 0 && i < 500; i++) { // wait up to 500 seconds
+ status = admin.getAlterStatus(desc.getTableName());
+ if (status.getSecond() != 0) {
+ LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
+ + " regions updated.");
+ Thread.sleep(1 * 1000l);
+ } else {
+ LOG.debug("All regions updated.");
+ }
+ }
+ if (status.getSecond() != 0) {
+ throw new Exception("Failed to update replica count after 500 seconds.");
+ }
+ }
+
+ /**
+ * Set the number of Region replicas.
+ */
+ private static void setReplicas(HBaseAdmin admin, TableName table, int replicaCount)
+ throws Exception {
+ admin.disableTable(table);
+ HTableDescriptor desc = admin.getTableDescriptor(table);
+ desc.setRegionReplication(replicaCount);
+ modifyTableSync(admin, desc);
+ admin.enableTable(table);
+ }
+
+ public void test() throws Exception {
+ int maxIters = 3;
+ String mr = nomapred ? "--nomapred" : "";
+ String replicas = "--replicas=" + replicaCount;
+ // TODO: splits disabled until "phase 2" is complete.
+ String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
+ String writeOpts = format("%s %s --table=%s --presplit=16 sequentialWrite 4",
+ mr, splitPolicy, tableName);
+ String readOpts =
+ format("%s --table=%s --latency --sampleRate=0.1 randomRead 4", mr, tableName);
+ String replicaReadOpts = format("%s %s", replicas, readOpts);
+
+ ArrayList<TimingResult> resultsWithoutReplica = new ArrayList<TimingResult>(maxIters);
+ ArrayList<TimingResult> resultsWithReplica = new ArrayList<TimingResult>(maxIters);
+
+ // create/populate the table, replicas disabled
+ LOG.debug("Populating table.");
+ new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
+
+ // one last sanity check, then send in the clowns!
+ assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
+ DisabledRegionSplitPolicy.class.getName(),
+ util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
+ startMonkey();
+
+ // collect a baseline without region replicas.
+ for (int i = 0; i < maxIters; i++) {
+ LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
+ resultsWithoutReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
+ // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
+ Thread.sleep(5000l);
+ }
+
+ // disable monkey, enable region replicas, enable monkey
+ cleanUpMonkey("Altering table.");
+ LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
+ setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
+ setUpMonkey();
+ startMonkey();
+
+ // run test with region replicas.
+ for (int i = 0; i < maxIters; i++) {
+ LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
+ resultsWithReplica.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
+ // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
+ Thread.sleep(5000l);
+ }
+
+ DescriptiveStatistics withoutReplicaStats = new DescriptiveStatistics();
+ for (TimingResult tr : resultsWithoutReplica) {
+ withoutReplicaStats.addValue(tr.elapsedTime);
+ }
+ DescriptiveStatistics withReplicaStats = new DescriptiveStatistics();
+ for (TimingResult tr : resultsWithReplica) {
+ withReplicaStats.addValue(tr.elapsedTime);
+ }
+
+ LOG.info(Objects.toStringHelper("testName")
+ .add("withoutReplicas", resultsWithoutReplica)
+ .add("withReplicas", resultsWithReplica)
+ .add("withoutReplicasMean", withoutReplicaStats.getMean())
+ .add("withReplicasMean", withReplicaStats.getMean())
+ .toString());
+
+ assertTrue(
+ "Running with region replicas under chaos should be as fast or faster than without. "
+ + "withReplicas.mean: " + withReplicaStats.getMean() + "ms "
+ + "withoutReplicas.mean: " + withoutReplicaStats.getMean() + "ms.",
+ withReplicaStats.getMean() <= withoutReplicaStats.getMean());
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
+ System.exit(status);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d313103a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index 227fc1a..d9fa0b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -30,7 +30,9 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Queue;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@@ -39,12 +41,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -84,7 +89,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.jackson.map.ObjectMapper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.stats.UniformSample;
import com.yammer.metrics.stats.Snapshot;
@@ -99,16 +103,17 @@ import org.htrace.impl.ProbabilitySampler;
* client that steps through one of a set of hardcoded tests or 'experiments'
* (e.g. a random reads test, a random writes test, etc.). Pass on the
* command-line which test to run and how many clients are participating in
- * this experiment. Run <code>java PerformanceEvaluation --help</code> to
- * obtain usage.
+ * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
*
* <p>This class sets up and runs the evaluation programs described in
* Section 7, <i>Performance Evaluation</i>, of the <a
* href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
* paper, pages 8-10.
*
- * <p>If number of clients > 1, we start up a MapReduce job. Each map task
- * runs an individual client. Each client does about 1GB of data.
+ * <p>By default, runs as a mapreduce job where each mapper runs a single test
+ * client. Can also run as a non-mapreduce, multithreaded application by
+ * specifying {@code --nomapred}. Each client does about 1GB of data, unless
+ * specified otherwise.
*/
public class PerformanceEvaluation extends Configured implements Tool {
protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
@@ -133,10 +138,35 @@ public class PerformanceEvaluation extends Configured implements Tool {
private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
private static final TestOptions DEFAULT_OPTS = new TestOptions();
- protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
-
+ private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
+ static {
+ addCommandDescriptor(RandomReadTest.class, "randomRead",
+ "Run random read test");
+ addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
+ "Run random seek and scan 100 test");
+ addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
+ "Run random seek scan with both start and stop row (max 10 rows)");
+ addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
+ "Run random seek scan with both start and stop row (max 100 rows)");
+ addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
+ "Run random seek scan with both start and stop row (max 1000 rows)");
+ addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
+ "Run random seek scan with both start and stop row (max 10000 rows)");
+ addCommandDescriptor(RandomWriteTest.class, "randomWrite",
+ "Run random write test");
+ addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
+ "Run sequential read test");
+ addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
+ "Run sequential write test");
+ addCommandDescriptor(ScanTest.class, "scan",
+ "Run scan test (read every row)");
+ addCommandDescriptor(FilteredScanTest.class, "filterScan",
+ "Run scan test using a filter to find a specific row based on it's value " +
+ "(make sure to use --rows=20)");
+ }
+
/**
* Enum for map metrics. Keep it out here rather than inside in the Map
* inner-class so we can find associated properties.
@@ -154,37 +184,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
*/
public PerformanceEvaluation(final Configuration conf) {
super(conf);
-
- addCommandDescriptor(RandomReadTest.class, "randomRead",
- "Run random read test");
- addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
- "Run random seek and scan 100 test");
- addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
- "Run random seek scan with both start and stop row (max 10 rows)");
- addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
- "Run random seek scan with both start and stop row (max 100 rows)");
- addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
- "Run random seek scan with both start and stop row (max 1000 rows)");
- addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
- "Run random seek scan with both start and stop row (max 10000 rows)");
- addCommandDescriptor(RandomWriteTest.class, "randomWrite",
- "Run random write test");
- addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
- "Run sequential read test");
- addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
- "Run sequential write test");
- addCommandDescriptor(ScanTest.class, "scan",
- "Run scan test (read every row)");
- addCommandDescriptor(FilteredScanTest.class, "filterScan",
- "Run scan test using a filter to find a specific row based on it's value " +
- "(make sure to use --rows=20)");
}
- protected void addCommandDescriptor(Class<? extends Test> cmdClass,
+ protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
String name, String description) {
- CmdDescriptor cmdDescriptor =
- new CmdDescriptor(cmdClass, name, description);
- commands.put(name, cmdDescriptor);
+ CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
+ COMMANDS.put(name, cmdDescriptor);
}
/**
@@ -235,10 +240,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
+ @Override
protected void map(LongWritable key, Text value, final Context context)
throws IOException, InterruptedException {
Status status = new Status() {
+ @Override
public void setStatus(String msg) {
context.setStatus(msg);
}
@@ -260,35 +267,62 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
/*
- * If table does not already exist, create.
- * @param c Client to use checking.
- * @return True if we created the table.
- * @throws IOException
+ * If table does not already exist, create. Also create a table when
+ * {@code opts.presplitRegions} is specified or when the existing table's
+ * region replica count doesn't match {@code opts.replicas}.
*/
- private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
- HTableDescriptor tableDescriptor = getTableDescriptor(opts);
- if (opts.presplitRegions > 0) {
- // presplit requested
- if (admin.tableExists(tableDescriptor.getTableName())) {
- admin.disableTable(tableDescriptor.getTableName());
- admin.deleteTable(tableDescriptor.getTableName());
+ static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
+ TableName tableName = TableName.valueOf(opts.tableName);
+ boolean needsDelete = false, exists = admin.tableExists(tableName);
+ boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
+ || opts.cmdName.toLowerCase().contains("scan");
+ if (!exists && isReadCmd) {
+ throw new IllegalStateException(
+ "Must specify an existing table for read commands. Run a write command first.");
+ }
+ HTableDescriptor desc =
+ exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
+ byte[][] splits = getSplits(opts);
+
+ // recreate the table when user has requested presplit or when existing
+ // {RegionSplitPolicy,replica count} does not match requested.
+ if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
+ || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)
+ || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)) {
+ needsDelete = true;
+ // wait, why did it delete my table?!?
+ LOG.debug(Objects.toStringHelper("needsDelete")
+ .add("needsDelete", needsDelete)
+ .add("isReadCmd", isReadCmd)
+ .add("exists", exists)
+ .add("desc", desc)
+ .add("presplit", opts.presplitRegions)
+ .add("splitPolicy", opts.splitPolicy)
+ .add("replicas", opts.replicas));
+ }
+
+ // remove an existing table
+ if (needsDelete) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
}
+ admin.deleteTable(tableName);
+ }
- byte[][] splits = getSplits(opts);
- for (int i=0; i < splits.length; i++) {
- LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
- }
- admin.createTable(tableDescriptor, splits);
- LOG.info ("Table created with " + opts.presplitRegions + " splits");
- }
- else {
- boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
- if (!tableExists) {
- admin.createTable(tableDescriptor);
- LOG.info("Table " + tableDescriptor + " created");
+ // table creation is necessary
+ if (!exists || needsDelete) {
+ desc = getTableDescriptor(opts);
+ if (splits != null) {
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < splits.length; i++) {
+ LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+ }
+ }
}
+ admin.createTable(desc, splits);
+ LOG.info("Table " + desc + " created");
}
- return admin.tableExists(tableDescriptor.getTableName());
+ return admin.tableExists(tableName);
}
/**
@@ -304,6 +338,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
family.setInMemory(true);
}
desc.addFamily(family);
+ if (opts.replicas != DEFAULT_OPTS.replicas) {
+ desc.setRegionReplication(opts.replicas);
+ }
+ if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
+ desc.setRegionSplitPolicyClassName(opts.splitPolicy);
+ }
return desc;
}
@@ -311,8 +351,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
* generates splits based on total number of rows and specified split regions
*/
protected static byte[][] getSplits(TestOptions opts) {
- if (opts.presplitRegions == 0)
- return new byte [0][];
+ if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
+ return null;
int numSplitPoints = opts.presplitRegions - 1;
byte[][] splits = new byte[numSplitPoints][];
@@ -329,8 +369,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param cmd Command to run.
* @throws IOException
*/
- private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
+ static long doLocalClients(final TestOptions opts, final Configuration conf)
throws IOException, InterruptedException {
+ final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+ assert cmd != null;
Future<Long>[] threads = new Future[opts.numClientThreads];
long[] timings = new long[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
@@ -342,7 +384,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
public Long call() throws Exception {
TestOptions threadOpts = new TestOptions(opts);
if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
- long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
+ long elapsedTime = runOneClient(cmd, conf, threadOpts, new Status() {
+ @Override
public void setStatus(final String msg) throws IOException {
LOG.info(msg);
}
@@ -370,9 +413,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
total += timing;
}
LOG.info("[" + test + "]"
- + "\tMin: " + timings[0] + "ms"
- + "\tMax: " + timings[timings.length - 1] + "ms"
- + "\tAvg: " + (total / timings.length) + "ms");
+ + "\tMin: " + timings[0] + "ms"
+ + "\tMax: " + timings[timings.length - 1] + "ms"
+ + "\tAvg: " + (total / timings.length) + "ms");
+ return total;
}
/*
@@ -382,15 +426,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param cmd Command to run.
* @throws IOException
*/
- private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
- InterruptedException, ClassNotFoundException {
- Configuration conf = getConf();
+ static Job doMapReduce(TestOptions opts, final Configuration conf)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+ assert cmd != null;
Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
- conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
+ conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
Job job = new Job(conf);
job.setJarByClass(PerformanceEvaluation.class);
- job.setJobName("HBase Performance Evaluation");
+ job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setInputPaths(job, inputDir);
@@ -410,12 +455,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- Histogram.class, // yammer metrics
+ Histogram.class, // yammer metrics
ObjectMapper.class); // jackson-mapper-asl
TableMapReduceUtil.initCredentials(job);
job.waitForCompletion(true);
+ return job;
}
/*
@@ -424,7 +470,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @return Directory that contains file written.
* @throws IOException
*/
- private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
+ private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
Path inputDir = new Path(jobdir, "inputs");
@@ -491,6 +537,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* This makes tracking all these arguments a little easier.
*/
static class TestOptions {
+ String cmdName = null;
boolean nomapred = false;
boolean filterAll = false;
int startRow = 0;
@@ -511,6 +558,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
int multiGet = 0;
boolean inMemoryCF = false;
int presplitRegions = 0;
+ int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
+ String splitPolicy = null;
Compression.Algorithm compression = Compression.Algorithm.NONE;
BloomType bloomType = BloomType.ROW;
DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
@@ -521,6 +570,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
public TestOptions() {}
public TestOptions(TestOptions that) {
+ this.cmdName = that.cmdName;
this.nomapred = that.nomapred;
this.startRow = that.startRow;
this.size = that.size;
@@ -540,6 +590,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.multiGet = that.multiGet;
this.inMemoryCF = that.inMemoryCF;
this.presplitRegions = that.presplitRegions;
+ this.replicas = that.replicas;
+ this.splitPolicy = that.splitPolicy;
this.compression = that.compression;
this.blockEncoding = that.blockEncoding;
this.filterAll = that.filterAll;
@@ -712,7 +764,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (rs == null || !isRandomValueSize()) return;
for (Result r: rs) updateValueSize(r);
}
-
+
void updateValueSize(final Result r) throws IOException {
if (r == null || !isRandomValueSize()) return;
int size = 0;
@@ -731,7 +783,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
(!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
}
-
+
boolean isRandomValueSize() {
return opts.valueRandom;
}
@@ -825,7 +877,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
valueSize.count() + " measures");
reportHistogram(this.valueSize);
}
-
+
private void reportHistogram(final Histogram h) throws IOException {
Snapshot sn = h.getSnapshot();
status.setStatus(testName + " Min = " + h.min());
@@ -997,10 +1049,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class RandomReadTest extends Test {
+ private final Consistency consistency;
private ArrayList<Get> gets;
RandomReadTest(HConnection con, TestOptions options, Status status) {
super(con, options, status);
+ consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
if (opts.multiGet > 0) {
LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
this.gets = new ArrayList<Get>(opts.multiGet);
@@ -1014,6 +1068,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (opts.filterAll) {
get.setFilter(new FilterAllFilter());
}
+ get.setConsistency(consistency);
if (LOG.isTraceEnabled()) LOG.trace(get.toString());
if (opts.multiGet > 0) {
this.gets.add(get);
@@ -1313,9 +1368,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (admin != null) admin.close();
}
if (opts.nomapred) {
- doLocalClients(cmd, opts);
+ doLocalClients(opts, getConf());
} else {
- doMapReduce(cmd, opts);
+ doMapReduce(opts, getConf());
}
}
@@ -1368,6 +1423,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
"'valueSize'; set on read for stats on size: Default: Not set.");
System.err.println(" period Report every 'period' rows: " +
"Default: opts.perClientRunRows / 10");
+ System.err.println(" multiGet Batch gets together into groups of N. Only supported " +
+ "by randomRead. Default: disabled");
+ System.err.println(" replicas Enable region replica testing. Defaults: 1.");
+ System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: ");
@@ -1375,7 +1434,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" -Dmapreduce.task.timeout=60000");
System.err.println();
System.err.println("Command:");
- for (CmdDescriptor command : commands.values()) {
+ for (CmdDescriptor command : COMMANDS.values()) {
System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
}
System.err.println();
@@ -1389,215 +1448,234 @@ public class PerformanceEvaluation extends Configured implements Tool {
+ " sequentialWrite 1");
}
- private static int getNumClients(final int start, final String[] args) {
- if(start + 1 > args.length) {
- throw new IllegalArgumentException("must supply the number of clients");
- }
- int N = Integer.parseInt(args[start]);
- if (N < 1) {
- throw new IllegalArgumentException("Number of clients must be > 1");
- }
- return N;
- }
+ /**
+ * Parse options passed in via an arguments array. Assumes that array has been split
+ * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
+ * in the queue at the conclusion of this method call. It's up to the caller to deal
+ * with these unrecognized arguments.
+ */
+ static TestOptions parseOpts(Queue<String> args) {
+ TestOptions opts = new TestOptions();
+
+ String cmd = null;
+ while ((cmd = args.poll()) != null) {
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ // place item back onto queue so that caller knows parsing was incomplete
+ args.add(cmd);
+ break;
+ }
- public int run(String[] args) throws Exception {
- // Process command-line args. TODO: Better cmd-line processing
- // (but hopefully something not as painful as cli options).
- int errCode = -1;
- if (args.length < 1) {
- printUsage();
- return errCode;
- }
+ final String nmr = "--nomapred";
+ if (cmd.startsWith(nmr)) {
+ opts.nomapred = true;
+ continue;
+ }
- try {
- // MR-NOTE: if you are adding a property that is used to control an operation
- // like put(), get(), scan(), ... you must also add it as part of the MR
- // input, take a look at writeInputFile().
- // Then you must adapt the LINE_PATTERN input regex,
- // and parse the argument, take a look at PEInputFormat.getSplits().
-
- TestOptions opts = new TestOptions();
-
- for (int i = 0; i < args.length; i++) {
- String cmd = args[i];
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
- printUsage();
- errCode = 0;
- break;
- }
+ final String rows = "--rows=";
+ if (cmd.startsWith(rows)) {
+ opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
+ continue;
+ }
- final String nmr = "--nomapred";
- if (cmd.startsWith(nmr)) {
- opts.nomapred = true;
- continue;
- }
+ final String sampleRate = "--sampleRate=";
+ if (cmd.startsWith(sampleRate)) {
+ opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+ continue;
+ }
- final String rows = "--rows=";
- if (cmd.startsWith(rows)) {
- opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
- continue;
- }
+ final String table = "--table=";
+ if (cmd.startsWith(table)) {
+ opts.tableName = cmd.substring(table.length());
+ continue;
+ }
- final String startRow = "--startRow=";
- if (cmd.startsWith(startRow)) {
- opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
- continue;
- }
+ final String startRow = "--startRow=";
+ if (cmd.startsWith(startRow)) {
+ opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
+ continue;
+ }
- final String sampleRate = "--sampleRate=";
- if (cmd.startsWith(sampleRate)) {
- opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
- continue;
- }
+ final String compress = "--compress=";
+ if (cmd.startsWith(compress)) {
+ opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+ continue;
+ }
- final String traceRate = "--traceRate=";
- if (cmd.startsWith(traceRate)) {
- opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
- continue;
- }
+ final String traceRate = "--traceRate=";
+ if (cmd.startsWith(traceRate)) {
+ opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
+ continue;
+ }
- final String table = "--table=";
- if (cmd.startsWith(table)) {
- opts.tableName = cmd.substring(table.length());
- continue;
- }
+ final String blockEncoding = "--blockEncoding=";
+ if (cmd.startsWith(blockEncoding)) {
+ opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+ continue;
+ }
- final String compress = "--compress=";
- if (cmd.startsWith(compress)) {
- opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
- continue;
- }
+ final String flushCommits = "--flushCommits=";
+ if (cmd.startsWith(flushCommits)) {
+ opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+ continue;
+ }
- final String blockEncoding = "--blockEncoding=";
- if (cmd.startsWith(blockEncoding)) {
- opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
- continue;
- }
+ final String writeToWAL = "--writeToWAL=";
+ if (cmd.startsWith(writeToWAL)) {
+ opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+ continue;
+ }
- final String flushCommits = "--flushCommits=";
- if (cmd.startsWith(flushCommits)) {
- opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
- continue;
- }
+ final String presplit = "--presplit=";
+ if (cmd.startsWith(presplit)) {
+ opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+ continue;
+ }
- final String writeToWAL = "--writeToWAL=";
- if (cmd.startsWith(writeToWAL)) {
- opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
- continue;
- }
+ final String inMemory = "--inmemory=";
+ if (cmd.startsWith(inMemory)) {
+ opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+ continue;
+ }
- final String autoFlush = "--autoFlush=";
- if (cmd.startsWith(autoFlush)) {
- opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
- continue;
- }
+ final String autoFlush = "--autoFlush=";
+ if (cmd.startsWith(autoFlush)) {
+ opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
+ continue;
+ }
- final String onceCon = "--oneCon=";
- if (cmd.startsWith(onceCon)) {
- opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
- continue;
- }
+ final String onceCon = "--oneCon=";
+ if (cmd.startsWith(onceCon)) {
+ opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
+ continue;
+ }
- final String presplit = "--presplit=";
- if (cmd.startsWith(presplit)) {
- opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
- continue;
- }
+ final String latency = "--latency";
+ if (cmd.startsWith(latency)) {
+ opts.reportLatency = true;
+ continue;
+ }
- final String inMemory = "--inmemory=";
- if (cmd.startsWith(inMemory)) {
- opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
- continue;
- }
+ final String multiGet = "--multiGet=";
+ if (cmd.startsWith(multiGet)) {
+ opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+ continue;
+ }
- final String latency = "--latency";
- if (cmd.startsWith(latency)) {
- opts.reportLatency = true;
- continue;
- }
+ final String useTags = "--usetags=";
+ if (cmd.startsWith(useTags)) {
+ opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+ continue;
+ }
- final String multiGet = "--multiGet=";
- if (cmd.startsWith(multiGet)) {
- opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
- continue;
- }
+ final String noOfTags = "--nooftags=";
+ if (cmd.startsWith(noOfTags)) {
+ opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+ continue;
+ }
- final String useTags = "--usetags=";
- if (cmd.startsWith(useTags)) {
- opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
- continue;
- }
+ final String replicas = "--replicas=";
+ if (cmd.startsWith(replicas)) {
+ opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
+ continue;
+ }
- final String noOfTags = "--nooftags=";
- if (cmd.startsWith(noOfTags)) {
- opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
- continue;
- }
+ final String filterOutAll = "--filterAll";
+ if (cmd.startsWith(filterOutAll)) {
+ opts.filterAll = true;
+ continue;
+ }
- final String filterOutAll = "--filterAll";
- if (cmd.startsWith(filterOutAll)) {
- opts.filterAll = true;
- continue;
- }
+ final String size = "--size=";
+ if (cmd.startsWith(size)) {
+ opts.size = Float.parseFloat(cmd.substring(size.length()));
+ continue;
+ }
- final String size = "--size=";
- if (cmd.startsWith(size)) {
- opts.size = Float.parseFloat(cmd.substring(size.length()));
- continue;
- }
+ final String splitPolicy = "--splitPolicy=";
+ if (cmd.startsWith(splitPolicy)) {
+ opts.splitPolicy = cmd.substring(splitPolicy.length());
+ continue;
+ }
- final String bloomFilter = "--bloomFilter";
- if (cmd.startsWith(bloomFilter)) {
- opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
- continue;
- }
+ final String bloomFilter = "--bloomFilter";
+ if (cmd.startsWith(bloomFilter)) {
+ opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
+ continue;
+ }
- final String valueSize = "--valueSize=";
- if (cmd.startsWith(valueSize)) {
- opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
- continue;
- }
+ final String valueSize = "--valueSize=";
+ if (cmd.startsWith(valueSize)) {
+ opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
+ continue;
+ }
- final String valueRandom = "--valueRandom";
- if (cmd.startsWith(valueRandom)) {
- opts.valueRandom = true;
- continue;
- }
+ final String valueRandom = "--valueRandom";
+ if (cmd.startsWith(valueRandom)) {
+ opts.valueRandom = true;
+ continue;
+ }
- final String period = "--period=";
- if (cmd.startsWith(period)) {
- opts.period = Integer.parseInt(cmd.substring(period.length()));
- continue;
- }
+ final String period = "--period=";
+ if (cmd.startsWith(period)) {
+ opts.period = Integer.parseInt(cmd.substring(period.length()));
+ continue;
+ }
- Class<? extends Test> cmdClass = determineCommandClass(cmd);
- if (cmdClass != null) {
- opts.numClientThreads = getNumClients(i + 1, args);
- if (opts.size != DEFAULT_OPTS.size &&
+ if (isCommandClass(cmd)) {
+ opts.cmdName = cmd;
+ opts.numClientThreads = Integer.parseInt(args.remove());
+ int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
+ if (opts.size != DEFAULT_OPTS.size &&
opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
- throw new IllegalArgumentException(rows + " and " + size +
- " are mutually exclusive arguments.");
- }
- // Calculate how many rows per gig. If random value size presume that that half the max
- // is average row size.
- int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
- if (opts.size != DEFAULT_OPTS.size) {
- // total size in GB specified
- opts.totalRows = (int) opts.size * rowsPerGB;
- opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
- } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
- // number of rows specified
- opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
- opts.size = opts.totalRows / rowsPerGB;
- }
- runTest(cmdClass, opts);
- errCode = 0;
- break;
+ throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
+ }
+ if (opts.size != DEFAULT_OPTS.size) {
+ // total size in GB specified
+ opts.totalRows = (int) opts.size * rowsPerGB;
+ opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
+ } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
+ // number of rows specified
+ opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
+ opts.size = opts.totalRows / rowsPerGB;
}
+ break;
+ }
+ }
+ return opts;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ // Process command-line args. TODO: Better cmd-line processing
+ // (but hopefully something not as painful as cli options).
+ int errCode = -1;
+ if (args.length < 1) {
+ printUsage();
+ return errCode;
+ }
+
+ try {
+ LinkedList<String> argv = new LinkedList<String>();
+ argv.addAll(Arrays.asList(args));
+ TestOptions opts = parseOpts(argv);
+ // args remainting, print help and exit
+ if (!argv.isEmpty()) {
+ errCode = 0;
printUsage();
- break;
}
+
+ // must run at least 1 client
+ if (opts.numClientThreads <= 0) {
+ throw new IllegalArgumentException("Number of clients must be > 0");
+ }
+
+ Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
+ if (cmdClass != null) {
+ runTest(cmdClass, opts);
+ errCode = 0;
+ }
+
} catch (Exception e) {
e.printStackTrace();
}
@@ -1605,8 +1683,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
return errCode;
}
- private Class<? extends Test> determineCommandClass(String cmd) {
- CmdDescriptor descriptor = commands.get(cmd);
+ private static boolean isCommandClass(String cmd) {
+ return COMMANDS.containsKey(cmd);
+ }
+
+ private static Class<? extends Test> determineCommandClass(String cmd) {
+ CmdDescriptor descriptor = COMMANDS.get(cmd);
return descriptor != null ? descriptor.getCmdClass() : null;
}