You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/02/11 02:47:16 UTC
[2/3] hbase git commit: HBASE-15246 Backport branch-1
HBasePerformanceEvaluation to 0.98
http://git-wip-us.apache.org/repos/asf/hbase/blob/38ce707b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index ffa5150..1c84c30 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -30,7 +30,9 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Queue;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@@ -39,26 +41,32 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.filter.FilterList;
@@ -67,12 +75,11 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Hash;
-import org.apache.hadoop.hbase.util.MurmurHash;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.trace.SpanReceiverHost;
+import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -82,39 +89,49 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.cloudera.htrace.Sampler;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
+import org.cloudera.htrace.impl.ProbabilitySampler;
import org.codehaus.jackson.map.ObjectMapper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.stats.UniformSample;
+import com.yammer.metrics.stats.Snapshot;
/**
* Script used evaluating HBase performance and scalability. Runs a HBase
* client that steps through one of a set of hardcoded tests or 'experiments'
* (e.g. a random reads test, a random writes test, etc.). Pass on the
* command-line which test to run and how many clients are participating in
- * this experiment. Run <code>java PerformanceEvaluation --help</code> to
- * obtain usage.
+ * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
*
* <p>This class sets up and runs the evaluation programs described in
* Section 7, <i>Performance Evaluation</i>, of the <a
* href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
* paper, pages 8-10.
*
- * <p>If number of clients > 1, we start up a MapReduce job. Each map task
- * runs an individual client. Each client does about 1GB of data.
+ * <p>By default, runs as a mapreduce job where each mapper runs a single test
+ * client. Can also run as a non-mapreduce, multithreaded application by
+ * specifying {@code --nomapred}. Each client does about 1GB of data, unless
+ * specified otherwise.
*/
public class PerformanceEvaluation extends Configured implements Tool {
- protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
+ private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ static {
+ MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
+ }
public static final String TABLE_NAME = "TestTable";
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
- public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
- public static final int VALUE_LENGTH = 1000;
+ public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
+ public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
+ public static final int DEFAULT_VALUE_LENGTH = 1000;
public static final int ROW_LENGTH = 26;
private static final int ONE_GB = 1024 * 1024 * 1000;
- private static final int ROWS_PER_GB = ONE_GB / VALUE_LENGTH;
+ private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
// TODO : should we make this configurable
private static final int TAG_LENGTH = 256;
private static final DecimalFormat FMT = new DecimalFormat("0.##");
@@ -123,10 +140,45 @@ public class PerformanceEvaluation extends Configured implements Tool {
private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
private static final TestOptions DEFAULT_OPTS = new TestOptions();
- protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
-
+ private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<String, CmdDescriptor>();
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
+ static {
+ addCommandDescriptor(RandomReadTest.class, "randomRead",
+ "Run random read test");
+ addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
+ "Run random seek and scan 100 test");
+ addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
+ "Run random seek scan with both start and stop row (max 10 rows)");
+ addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
+ "Run random seek scan with both start and stop row (max 100 rows)");
+ addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
+ "Run random seek scan with both start and stop row (max 1000 rows)");
+ addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
+ "Run random seek scan with both start and stop row (max 10000 rows)");
+ addCommandDescriptor(RandomWriteTest.class, "randomWrite",
+ "Run random write test");
+ addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
+ "Run sequential read test");
+ addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
+ "Run sequential write test");
+ addCommandDescriptor(ScanTest.class, "scan",
+ "Run scan test (read every row)");
+ addCommandDescriptor(FilteredScanTest.class, "filterScan",
+ "Run scan test using a filter to find a specific row based on it's value " +
+ "(make sure to use --rows=20)");
+ addCommandDescriptor(IncrementTest.class, "increment",
+ "Increment on each row; clients overlap on keyspace so some concurrent operations");
+ addCommandDescriptor(AppendTest.class, "append",
+ "Append on each row; clients overlap on keyspace so some concurrent operations");
+ addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
+ "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
+ addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
+ "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
+ addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
+ "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
+ }
+
/**
* Enum for map metrics. Keep it out here rather than inside in the Map
* inner-class so we can find associated properties.
@@ -138,42 +190,40 @@ public class PerformanceEvaluation extends Configured implements Tool {
ROWS
}
+ protected static class RunResult implements Comparable<RunResult> {
+ public RunResult(long duration, Histogram hist) {
+ this.duration = duration;
+ this.hist = hist;
+ }
+
+ public final long duration;
+ public final Histogram hist;
+
+ @Override
+ public String toString() {
+ return Long.toString(duration);
+ }
+
+ @Override public int compareTo(RunResult o) {
+ if (this.duration == o.duration) {
+ return 0;
+ }
+ return this.duration > o.duration ? 1 : -1;
+ }
+ }
+
/**
* Constructor
* @param conf Configuration object
*/
public PerformanceEvaluation(final Configuration conf) {
super(conf);
-
- addCommandDescriptor(RandomReadTest.class, "randomRead",
- "Run random read test");
- addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
- "Run random seek and scan 100 test");
- addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
- "Run random seek scan with both start and stop row (max 10 rows)");
- addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
- "Run random seek scan with both start and stop row (max 100 rows)");
- addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
- "Run random seek scan with both start and stop row (max 1000 rows)");
- addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
- "Run random seek scan with both start and stop row (max 10000 rows)");
- addCommandDescriptor(RandomWriteTest.class, "randomWrite",
- "Run random write test");
- addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
- "Run sequential read test");
- addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
- "Run sequential write test");
- addCommandDescriptor(ScanTest.class, "scan",
- "Run scan test (read every row)");
- addCommandDescriptor(FilteredScanTest.class, "filterScan",
- "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
}
- protected void addCommandDescriptor(Class<? extends Test> cmdClass,
+ protected static void addCommandDescriptor(Class<? extends Test> cmdClass,
String name, String description) {
- CmdDescriptor cmdDescriptor =
- new CmdDescriptor(cmdClass, name, description);
- commands.put(name, cmdDescriptor);
+ CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
+ COMMANDS.put(name, cmdDescriptor);
}
/**
@@ -200,7 +250,6 @@ public class PerformanceEvaluation extends Configured implements Tool {
public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
private Class<? extends Test> cmd;
- private PerformanceEvaluation pe;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
@@ -211,8 +260,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
Class<? extends PerformanceEvaluation> peClass =
forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
try {
- this.pe = peClass.getConstructor(Configuration.class)
- .newInstance(context.getConfiguration());
+ peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
} catch (Exception e) {
throw new IllegalStateException("Could not instantiate PE instance", e);
}
@@ -226,10 +274,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
+ @Override
protected void map(LongWritable key, Text value, final Context context)
throws IOException, InterruptedException {
Status status = new Status() {
+ @Override
public void setStatus(String msg) {
context.setStatus(msg);
}
@@ -238,55 +288,80 @@ public class PerformanceEvaluation extends Configured implements Tool {
ObjectMapper mapper = new ObjectMapper();
TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
Configuration conf = HBaseConfiguration.create(context.getConfiguration());
+ final HConnection con = HConnectionManager.createConnection(conf);
// Evaluation task
- long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status);
+ RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, opts, status);
// Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter.
- context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
+ context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
- context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime));
+ context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
context.progress();
}
}
/*
- * If table does not already exist, create.
- * @param c Client to use checking.
- * @return True if we created the table.
- * @throws IOException
+ * If table does not already exist, create. Also create a table when
+ * {@code opts.presplitRegions} is specified.
*/
- private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
- HTableDescriptor tableDescriptor = getTableDescriptor(opts);
- if (opts.presplitRegions > 0) {
- // presplit requested
- if (admin.tableExists(tableDescriptor.getTableName())) {
- admin.disableTable(tableDescriptor.getTableName());
- admin.deleteTable(tableDescriptor.getTableName());
- }
-
- byte[][] splits = getSplits(opts);
- for (int i=0; i < splits.length; i++) {
- LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+ static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
+ TableName tableName = TableName.valueOf(opts.tableName);
+ boolean needsDelete = false, exists = admin.tableExists(tableName);
+ boolean isReadCmd = opts.cmdName.toLowerCase().contains("read")
+ || opts.cmdName.toLowerCase().contains("scan");
+ if (!exists && isReadCmd) {
+ throw new IllegalStateException(
+ "Must specify an existing table for read commands. Run a write command first.");
+ }
+ HTableDescriptor desc =
+ exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : null;
+ byte[][] splits = getSplits(opts);
+
+ // recreate the table when user has requested presplit or when existing
+ // {RegionSplitPolicy,replica count} does not match requested.
+ if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
+ || (!isReadCmd && desc != null && desc.getRegionSplitPolicyClassName() != opts.splitPolicy)) {
+ needsDelete = true;
+ // wait, why did it delete my table?!?
+ LOG.debug(Objects.toStringHelper("needsDelete")
+ .add("needsDelete", needsDelete)
+ .add("isReadCmd", isReadCmd)
+ .add("exists", exists)
+ .add("desc", desc)
+ .add("presplit", opts.presplitRegions)
+ .add("splitPolicy", opts.splitPolicy));
+ }
+
+ // remove an existing table
+ if (needsDelete) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
}
- admin.createTable(tableDescriptor, splits);
- LOG.info ("Table created with " + opts.presplitRegions + " splits");
+ admin.deleteTable(tableName);
}
- else {
- boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
- if (!tableExists) {
- admin.createTable(tableDescriptor);
- LOG.info("Table " + tableDescriptor + " created");
+
+ // table creation is necessary
+ if (!exists || needsDelete) {
+ desc = getTableDescriptor(opts);
+ if (splits != null) {
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < splits.length; i++) {
+ LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+ }
+ }
}
+ admin.createTable(desc, splits);
+ LOG.info("Table " + desc + " created");
}
- return admin.tableExists(tableDescriptor.getTableName());
+ return admin.tableExists(tableName);
}
/**
* Create an HTableDescriptor from provided TestOptions.
*/
protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
- HTableDescriptor desc = new HTableDescriptor(opts.tableName);
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(opts.tableName));
HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
family.setDataBlockEncoding(opts.blockEncoding);
family.setCompressionType(opts.compression);
@@ -295,6 +370,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
family.setInMemory(true);
}
desc.addFamily(family);
+ if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
+ desc.setRegionSplitPolicyClassName(opts.splitPolicy);
+ }
return desc;
}
@@ -302,8 +380,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
* generates splits based on total number of rows and specified split regions
*/
protected static byte[][] getSplits(TestOptions opts) {
- if (opts.presplitRegions == 0)
- return new byte [0][];
+ if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
+ return null;
int numSplitPoints = opts.presplitRegions - 1;
byte[][] splits = new byte[numSplitPoints][];
@@ -317,53 +395,61 @@ public class PerformanceEvaluation extends Configured implements Tool {
/*
* Run all clients in this vm each to its own thread.
- * @param cmd Command to run.
- * @throws IOException
*/
- private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
+ static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
throws IOException, InterruptedException {
- Future<Long>[] threads = new Future[opts.numClientThreads];
- long[] timings = new long[opts.numClientThreads];
+ final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+ assert cmd != null;
+ @SuppressWarnings("unchecked")
+ Future<RunResult>[] threads = new Future[opts.numClientThreads];
+ RunResult[] results = new RunResult[opts.numClientThreads];
ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
+ final HConnection con = HConnectionManager.createConnection(conf);
for (int i = 0; i < threads.length; i++) {
final int index = i;
- threads[i] = pool.submit(new Callable<Long>() {
+ threads[i] = pool.submit(new Callable<RunResult>() {
@Override
- public Long call() throws Exception {
+ public RunResult call() throws Exception {
TestOptions threadOpts = new TestOptions(opts);
- threadOpts.startRow = index * threadOpts.perClientRunRows;
- long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
+ if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
+ RunResult run = runOneClient(cmd, conf, con, threadOpts, new Status() {
+ @Override
public void setStatus(final String msg) throws IOException {
- LOG.info("client-" + Thread.currentThread().getName() + " " + msg);
+ LOG.info(msg);
}
});
- LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime +
+ LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
"ms over " + threadOpts.perClientRunRows + " rows");
- return elapsedTime;
+ return run;
}
});
}
pool.shutdown();
+
for (int i = 0; i < threads.length; i++) {
try {
- timings[i] = threads[i].get();
+ results[i] = threads[i].get();
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
final String test = cmd.getSimpleName();
LOG.info("[" + test + "] Summary of timings (ms): "
- + Arrays.toString(timings));
- Arrays.sort(timings);
+ + Arrays.toString(results));
+ Arrays.sort(results);
long total = 0;
- for (int i = 0; i < timings.length; i++) {
- total += timings[i];
+ for (RunResult result : results) {
+ total += result.duration;
}
LOG.info("[" + test + "]"
- + "\tMin: " + timings[0] + "ms"
- + "\tMax: " + timings[timings.length - 1] + "ms"
- + "\tAvg: " + (total / timings.length) + "ms");
+ + "\tMin: " + results[0] + "ms"
+ + "\tMax: " + results[results.length - 1] + "ms"
+ + "\tAvg: " + (total / results.length) + "ms");
+
+ con.close();
+
+ return results;
}
/*
@@ -373,15 +459,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param cmd Command to run.
* @throws IOException
*/
- private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
- InterruptedException, ClassNotFoundException {
- Configuration conf = getConf();
+ static Job doMapReduce(TestOptions opts, final Configuration conf)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ final Class<? extends Test> cmd = determineCommandClass(opts.cmdName);
+ assert cmd != null;
Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
- conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
- Job job = new Job(conf);
+ conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
+ Job job = Job.getInstance(conf);
job.setJarByClass(PerformanceEvaluation.class);
- job.setJobName("HBase Performance Evaluation");
+ job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setInputPaths(job, inputDir);
@@ -401,12 +488,13 @@ public class PerformanceEvaluation extends Configured implements Tool {
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- DescriptiveStatistics.class, // commons-math
- ObjectMapper.class); // jackson-mapper-asl
+ Histogram.class, // yammer metrics
+ ObjectMapper.class); // jackson-mapper-asl
TableMapReduceUtil.initCredentials(job);
job.waitForCompletion(true);
+ return job;
}
/*
@@ -415,7 +503,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @return Directory that contains file written.
* @throws IOException
*/
- private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
+ private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
Path inputDir = new Path(jobdir, "inputs");
@@ -429,15 +517,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
Map<Integer, String> m = new TreeMap<Integer, String>();
Hash h = MurmurHash.getInstance();
int perClientRows = (opts.totalRows / opts.numClientThreads);
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
try {
for (int i = 0; i < 10; i++) {
for (int j = 0; j < opts.numClientThreads; j++) {
TestOptions next = new TestOptions(opts);
next.startRow = (j * perClientRows) + (i * (perClientRows/10));
next.perClientRunRows = perClientRows / 10;
- String s = mapper.writeValueAsString(next);
+ String s = MAPPER.writeValueAsString(next);
+ LOG.info("maptask input=" + s);
int hash = h.hash(Bytes.toBytes(s));
m.put(hash, s);
}
@@ -481,54 +568,350 @@ public class PerformanceEvaluation extends Configured implements Tool {
/**
* Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
* This makes tracking all these arguments a little easier.
+ * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
+ * serialization of this TestOptions class behave), and you need to add to the clone constructor
+ * below copying your new option from the 'that' to the 'this'. Look for 'clone' below.
*/
static class TestOptions {
+ String cmdName = null;
+ boolean nomapred = false;
+ boolean filterAll = false;
+ int startRow = 0;
+ float size = 1.0f;
+ int perClientRunRows = DEFAULT_ROWS_PER_GB;
+ int numClientThreads = 1;
+ int totalRows = DEFAULT_ROWS_PER_GB;
+ float sampleRate = 1.0f;
+ double traceRate = 0.0;
+ String tableName = TABLE_NAME;
+ boolean flushCommits = true;
+ boolean writeToWAL = true;
+ boolean autoFlush = false;
+ boolean oneCon = false;
+ boolean useTags = false;
+ int noOfTags = 1;
+ boolean reportLatency = false;
+ int multiGet = 0;
+ int randomSleep = 0;
+ boolean inMemoryCF = false;
+ int presplitRegions = 0;
+ String splitPolicy = null;
+ Compression.Algorithm compression = Compression.Algorithm.NONE;
+ BloomType bloomType = BloomType.ROW;
+ DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
+ boolean valueRandom = false;
+ boolean valueZipf = false;
+ int valueSize = DEFAULT_VALUE_LENGTH;
+ int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
+ int columns = 1;
+ int caching = 30;
+ boolean addColumns = true;
public TestOptions() {}
+ /**
+ * Clone constructor.
+ * @param that Object to copy from.
+ */
public TestOptions(TestOptions that) {
+ this.cmdName = that.cmdName;
this.nomapred = that.nomapred;
this.startRow = that.startRow;
+ this.size = that.size;
this.perClientRunRows = that.perClientRunRows;
this.numClientThreads = that.numClientThreads;
this.totalRows = that.totalRows;
this.sampleRate = that.sampleRate;
+ this.traceRate = that.traceRate;
this.tableName = that.tableName;
this.flushCommits = that.flushCommits;
this.writeToWAL = that.writeToWAL;
+ this.autoFlush = that.autoFlush;
+ this.oneCon = that.oneCon;
this.useTags = that.useTags;
this.noOfTags = that.noOfTags;
this.reportLatency = that.reportLatency;
this.multiGet = that.multiGet;
this.inMemoryCF = that.inMemoryCF;
this.presplitRegions = that.presplitRegions;
+ this.splitPolicy = that.splitPolicy;
this.compression = that.compression;
this.blockEncoding = that.blockEncoding;
this.filterAll = that.filterAll;
this.bloomType = that.bloomType;
+ this.valueRandom = that.valueRandom;
+ this.valueZipf = that.valueZipf;
+ this.valueSize = that.valueSize;
+ this.period = that.period;
+ this.randomSleep = that.randomSleep;
this.addColumns = that.addColumns;
+ this.columns = that.columns;
+ this.caching = that.caching;
}
- public boolean nomapred = false;
- public boolean filterAll = false;
- public int startRow = 0;
- public int perClientRunRows = ROWS_PER_GB;
- public int numClientThreads = 1;
- public int totalRows = ROWS_PER_GB;
- public float sampleRate = 1.0f;
- public String tableName = TABLE_NAME;
- public boolean flushCommits = true;
- public boolean writeToWAL = true;
- public boolean useTags = false;
- public int noOfTags = 1;
- public boolean reportLatency = false;
- public int multiGet = 0;
- boolean inMemoryCF = false;
- int presplitRegions = 0;
- public Compression.Algorithm compression = Compression.Algorithm.NONE;
- public BloomType bloomType = BloomType.ROW;
- public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
- boolean addColumns = true;
+ public int getCaching() {
+ return this.caching;
+ }
+
+ public void setCaching(final int caching) {
+ this.caching = caching;
+ }
+
+ public int getColumns() {
+ return this.columns;
+ }
+
+ public void setColumns(final int columns) {
+ this.columns = columns;
+ }
+
+ public boolean isValueZipf() {
+ return valueZipf;
+ }
+
+ public void setValueZipf(boolean valueZipf) {
+ this.valueZipf = valueZipf;
+ }
+
+ public String getCmdName() {
+ return cmdName;
+ }
+
+ public void setCmdName(String cmdName) {
+ this.cmdName = cmdName;
+ }
+
+ public int getRandomSleep() {
+ return randomSleep;
+ }
+
+ public void setRandomSleep(int randomSleep) {
+ this.randomSleep = randomSleep;
+ }
+
+ public String getSplitPolicy() {
+ return splitPolicy;
+ }
+
+ public void setSplitPolicy(String splitPolicy) {
+ this.splitPolicy = splitPolicy;
+ }
+
+ public void setNomapred(boolean nomapred) {
+ this.nomapred = nomapred;
+ }
+
+ public void setFilterAll(boolean filterAll) {
+ this.filterAll = filterAll;
+ }
+
+ public void setStartRow(int startRow) {
+ this.startRow = startRow;
+ }
+
+ public void setSize(float size) {
+ this.size = size;
+ }
+
+ public void setPerClientRunRows(int perClientRunRows) {
+ this.perClientRunRows = perClientRunRows;
+ }
+
+ public void setNumClientThreads(int numClientThreads) {
+ this.numClientThreads = numClientThreads;
+ }
+
+ public void setTotalRows(int totalRows) {
+ this.totalRows = totalRows;
+ }
+
+ public void setSampleRate(float sampleRate) {
+ this.sampleRate = sampleRate;
+ }
+
+ public void setTraceRate(double traceRate) {
+ this.traceRate = traceRate;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public void setFlushCommits(boolean flushCommits) {
+ this.flushCommits = flushCommits;
+ }
+
+ public void setWriteToWAL(boolean writeToWAL) {
+ this.writeToWAL = writeToWAL;
+ }
+
+ public void setAutoFlush(boolean autoFlush) {
+ this.autoFlush = autoFlush;
+ }
+
+ public void setOneCon(boolean oneCon) {
+ this.oneCon = oneCon;
+ }
+
+ public void setUseTags(boolean useTags) {
+ this.useTags = useTags;
+ }
+
+ public void setNoOfTags(int noOfTags) {
+ this.noOfTags = noOfTags;
+ }
+
+ public void setReportLatency(boolean reportLatency) {
+ this.reportLatency = reportLatency;
+ }
+
+ public void setMultiGet(int multiGet) {
+ this.multiGet = multiGet;
+ }
+
+ public void setInMemoryCF(boolean inMemoryCF) {
+ this.inMemoryCF = inMemoryCF;
+ }
+
+ public void setPresplitRegions(int presplitRegions) {
+ this.presplitRegions = presplitRegions;
+ }
+
+ public void setCompression(Compression.Algorithm compression) {
+ this.compression = compression;
+ }
+
+ public void setBloomType(BloomType bloomType) {
+ this.bloomType = bloomType;
+ }
+
+ public void setBlockEncoding(DataBlockEncoding blockEncoding) {
+ this.blockEncoding = blockEncoding;
+ }
+
+ public void setValueRandom(boolean valueRandom) {
+ this.valueRandom = valueRandom;
+ }
+
+ public void setValueSize(int valueSize) {
+ this.valueSize = valueSize;
+ }
+
+ public void setPeriod(int period) {
+ this.period = period;
+ }
+
+ public boolean isNomapred() {
+ return nomapred;
+ }
+
+ public boolean isFilterAll() {
+ return filterAll;
+ }
+
+ public int getStartRow() {
+ return startRow;
+ }
+
+ public float getSize() {
+ return size;
+ }
+
+ public int getPerClientRunRows() {
+ return perClientRunRows;
+ }
+
+ public int getNumClientThreads() {
+ return numClientThreads;
+ }
+
+ public int getTotalRows() {
+ return totalRows;
+ }
+
+ public float getSampleRate() {
+ return sampleRate;
+ }
+
+ public double getTraceRate() {
+ return traceRate;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public boolean isFlushCommits() {
+ return flushCommits;
+ }
+
+ public boolean isWriteToWAL() {
+ return writeToWAL;
+ }
+
+ public boolean isAutoFlush() {
+ return autoFlush;
+ }
+
+ public boolean isUseTags() {
+ return useTags;
+ }
+
+ public int getNoOfTags() {
+ return noOfTags;
+ }
+
+ public boolean isReportLatency() {
+ return reportLatency;
+ }
+
+ public int getMultiGet() {
+ return multiGet;
+ }
+
+ public boolean isInMemoryCF() {
+ return inMemoryCF;
+ }
+
+ public int getPresplitRegions() {
+ return presplitRegions;
+ }
+
+ public Compression.Algorithm getCompression() {
+ return compression;
+ }
+
+ public DataBlockEncoding getBlockEncoding() {
+ return blockEncoding;
+ }
+
+ public boolean isValueRandom() {
+ return valueRandom;
+ }
+
+ public int getValueSize() {
+ return valueSize;
+ }
+
+ public int getPeriod() {
+ return period;
+ }
+
+ public BloomType getBloomType() {
+ return bloomType;
+ }
+
+ public boolean isOneCon() {
+ return oneCon;
+ }
+
+ public boolean getAddColumns() {
+ return addColumns;
+ }
+
+ public void setAddColumns(boolean addColumns) {
+ this.addColumns = addColumns;
+ }
}
/*
@@ -539,56 +922,125 @@ public class PerformanceEvaluation extends Configured implements Tool {
// Below is make it so when Tests are all running in the one
// jvm, that they each have a differently seeded Random.
private static final Random randomSeed = new Random(System.currentTimeMillis());
+
private static long nextRandomSeed() {
return randomSeed.nextLong();
}
+ private final int everyN;
+
protected final Random rand = new Random(nextRandomSeed());
protected final Configuration conf;
protected final TestOptions opts;
private final Status status;
+ private final Sampler<?> traceSampler;
+ private final SpanReceiverHost receiverHost;
protected HConnection connection;
- protected HTableInterface table;
+
+ private String testName;
+ private Histogram latency;
+ private Histogram valueSize;
+ private RandomDistribution.Zipf zipf;
/**
- * Note that all subclasses of this class must provide a public contructor
+ * Note that all subclasses of this class must provide a public constructor
* that has the exact same list of arguments.
*/
- Test(final Configuration conf, final TestOptions options, final Status status) {
- this.conf = conf;
+ Test(final HConnection con, final TestOptions options, final Status status) {
+ this.connection = con;
+ this.conf = con == null ? HBaseConfiguration.create() : this.connection.getConfiguration();
this.opts = options;
this.status = status;
+ this.testName = this.getClass().getSimpleName();
+ receiverHost = SpanReceiverHost.getInstance(conf);
+ if (options.traceRate >= 1.0) {
+ this.traceSampler = Sampler.ALWAYS;
+ } else if (options.traceRate > 0.0) {
+ this.traceSampler = new ProbabilitySampler(options.traceRate);
+ } else {
+ this.traceSampler = Sampler.NEVER;
+ }
+ everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
+ if (options.isValueZipf()) {
+ this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.1);
+ }
+ LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
}
- private String generateStatus(final int sr, final int i, final int lr) {
- return sr + "/" + i + "/" + lr;
+ int getValueLength(final Random r) {
+ if (this.opts.isValueRandom()) return Math.abs(r.nextInt() % opts.valueSize);
+ else if (this.opts.isValueZipf()) return Math.abs(this.zipf.nextInt());
+ else return opts.valueSize;
+ }
+
+ void updateValueSize(final Result [] rs) throws IOException {
+ if (rs == null || !isRandomValueSize()) return;
+ for (Result r: rs) updateValueSize(r);
+ }
+
+ void updateValueSize(final Result r) throws IOException {
+ if (r == null || !isRandomValueSize()) return;
+ int size = 0;
+ for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
+ size += scanner.current().getValueLength();
+ }
+ updateValueSize(size);
+ }
+
+ void updateValueSize(final int valueSize) {
+ if (!isRandomValueSize()) return;
+ this.valueSize.update(valueSize);
+ }
+
+ String generateStatus(final int sr, final int i, final int lr) {
+ return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
+ (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
+ }
+
+ boolean isRandomValueSize() {
+ return opts.valueRandom;
}
protected int getReportingPeriod() {
- int period = opts.perClientRunRows / 10;
- return period == 0 ? opts.perClientRunRows : period;
+ return opts.period;
+ }
+
+ /**
+ * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
+ */
+ public Histogram getLatency() {
+ return latency;
}
void testSetup() throws IOException {
- this.connection = HConnectionManager.createConnection(conf);
- this.table = connection.getTable(opts.tableName);
- this.table.setAutoFlush(false, true);
+ if (!opts.oneCon) {
+ this.connection = HConnectionManager.createConnection(conf);
+ }
+ onStartup();
+ latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
+ valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500));
}
+ abstract void onStartup() throws IOException;
+
void testTakedown() throws IOException {
- if (opts.flushCommits) {
- this.table.flushCommits();
+ reportLatency();
+ reportValueSize();
+ onTakedown();
+ if (!opts.oneCon) {
+ connection.close();
}
- table.close();
- connection.close();
+ receiverHost.closeReceivers();
}
+ abstract void onTakedown() throws IOException;
+
/*
* Run test
* @return Elapsed time.
* @throws IOException
*/
- long test() throws IOException {
+ long test() throws IOException, InterruptedException {
testSetup();
LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
final long startTime = System.nanoTime();
@@ -600,40 +1052,121 @@ public class PerformanceEvaluation extends Configured implements Tool {
return (System.nanoTime() - startTime) / 1000000;
}
+ int getStartRow() {
+ return opts.startRow;
+ }
+
+ int getLastRow() {
+ return getStartRow() + opts.perClientRunRows;
+ }
+
/**
* Provides an extension point for tests that don't want a per row invocation.
*/
- void testTimed() throws IOException {
- int lastRow = opts.startRow + opts.perClientRunRows;
+ void testTimed() throws IOException, InterruptedException {
+ int startRow = getStartRow();
+ int lastRow = getLastRow();
// Report on completion of 1/10th of total.
- for (int i = opts.startRow; i < lastRow; i++) {
- testRow(i);
+ for (int i = startRow; i < lastRow; i++) {
+ if (i % everyN != 0) continue;
+ long startTime = System.nanoTime();
+ TraceScope scope = Trace.startSpan("test row", traceSampler);
+ try {
+ testRow(i);
+ } finally {
+ scope.close();
+ }
+ latency.update((System.nanoTime() - startTime) / 1000);
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
- status.setStatus(generateStatus(opts.startRow, i, lastRow));
+ status.setStatus(generateStatus(startRow, i, lastRow));
}
}
}
+ /**
+ * report percentiles of latency
+ * @throws IOException
+ */
+ private void reportLatency() throws IOException {
+ status.setStatus(testName + " latency log (microseconds), on " +
+ latency.count() + " measures");
+ reportHistogram(this.latency);
+ }
+
+ private void reportValueSize() throws IOException {
+ status.setStatus(testName + " valueSize after " +
+ valueSize.count() + " measures");
+ reportHistogram(this.valueSize);
+ }
+
+ private void reportHistogram(final Histogram h) throws IOException {
+ Snapshot sn = h.getSnapshot();
+ status.setStatus(testName + " Min = " + h.min());
+ status.setStatus(testName + " Avg = " + h.mean());
+ status.setStatus(testName + " StdDev = " + h.stdDev());
+ status.setStatus(testName + " 50th = " + sn.getMedian());
+ status.setStatus(testName + " 75th = " + sn.get75thPercentile());
+ status.setStatus(testName + " 95th = " + sn.get95thPercentile());
+ status.setStatus(testName + " 99th = " + sn.get99thPercentile());
+ status.setStatus(testName + " 99.9th = " + sn.get999thPercentile());
+ status.setStatus(testName + " 99.99th = " + sn.getValue(0.9999));
+ status.setStatus(testName + " 99.999th = " + sn.getValue(0.99999));
+ status.setStatus(testName + " Max = " + h.max());
+ }
+
+ /**
+ * @return Subset of the histograms' calculation.
+ */
+ public String getShortLatencyReport() {
+ return YammerHistogramUtils.getShortHistogramReport(this.latency);
+ }
+
+ /**
+ * @return Subset of the histograms' calculation.
+ */
+ public String getShortValueSizeReport() {
+ return YammerHistogramUtils.getShortHistogramReport(this.valueSize);
+ }
+
/*
* Test for individual row.
* @param i Row index.
*/
- abstract void testRow(final int i) throws IOException;
+ abstract void testRow(final int i) throws IOException, InterruptedException;
}
+ static abstract class TableTest extends Test {
+ protected HTableInterface table;
- @SuppressWarnings("unused")
- static class RandomSeekScanTest extends Test {
- RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ TableTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void onStartup() throws IOException {
+ this.table = connection.getTable(TableName.valueOf(opts.tableName));
+ }
+
+ @Override
+ void onTakedown() throws IOException {
+ table.close();
+ }
+ }
+
+ static class RandomSeekScanTest extends TableTest {
+ RandomSeekScanTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
+ scan.setCaching(opts.caching);
FilterList list = new FilterList();
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
if (opts.filterAll) {
list.addFilter(new FilterAllFilter());
@@ -641,7 +1174,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
list.addFilter(new WhileMatchFilter(new PageFilter(120)));
scan.setFilter(list);
ResultScanner s = this.table.getScanner(scan);
- for (Result rr; (rr = s.next()) != null;) ;
+ for (Result rr; (rr = s.next()) != null;) {
+ updateValueSize(rr);
+ }
s.close();
}
@@ -653,28 +1188,31 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
- @SuppressWarnings("unused")
- static abstract class RandomScanWithRangeTest extends Test {
- RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ static abstract class RandomScanWithRangeTest extends TableTest {
+ RandomScanWithRangeTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
+ scan.setCaching(opts.caching);
if (opts.filterAll) {
scan.setFilter(new FilterAllFilter());
}
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
- ResultScanner s = this.table.getScanner(scan);
+ Result r = null;
int count = 0;
- for (Result rr; (rr = s.next()) != null;) {
+ ResultScanner s = this.table.getScanner(scan);
+ for (; (r = s.next()) != null;) {
+ updateValueSize(r);
count++;
}
-
if (i % 100 == 0) {
LOG.info(String.format("Scan for key range %s - %s returned %s rows",
Bytes.toString(startAndStopRow.getFirst()),
@@ -700,8 +1238,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
- RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ RandomScanWithRange10Test(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
@@ -711,8 +1249,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
- RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ RandomScanWithRange100Test(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
@@ -722,8 +1260,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
- RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ RandomScanWithRange1000Test(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
@@ -733,8 +1271,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
- RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ RandomScanWithRange10000Test(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
@@ -743,60 +1281,48 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
}
- static class RandomReadTest extends Test {
- private final int everyN;
- private final double[] times;
+ static class RandomReadTest extends TableTest {
private ArrayList<Get> gets;
- int idx = 0;
+ private Random rd = new Random();
- RandomReadTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
- everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
- LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
+ RandomReadTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
if (opts.multiGet > 0) {
LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
this.gets = new ArrayList<Get>(opts.multiGet);
}
- if (opts.reportLatency) {
- this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))];
- } else {
- this.times = null;
- }
}
@Override
- void testRow(final int i) throws IOException {
- if (i % everyN == 0) {
- Get get = new Get(getRandomRow(this.rand, opts.totalRows));
- if (opts.addColumns) {
- get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
- }
- if (opts.filterAll) {
- get.setFilter(new FilterAllFilter());
- }
- if (opts.multiGet > 0) {
- this.gets.add(get);
- if (this.gets.size() == opts.multiGet) {
- long start = System.nanoTime();
- this.table.get(this.gets);
- if (opts.reportLatency) {
- times[idx++] = (System.nanoTime() - start) / 1e6;
- }
- this.gets.clear();
- }
- } else {
- long start = System.nanoTime();
- this.table.get(get);
- if (opts.reportLatency) {
- times[idx++] = (System.nanoTime() - start) / 1e6;
- }
+ void testRow(final int i) throws IOException, InterruptedException {
+ if (opts.randomSleep > 0) {
+ Thread.sleep(rd.nextInt(opts.randomSleep));
+ }
+ Get get = new Get(getRandomRow(this.rand, opts.totalRows));
+ if (opts.addColumns) {
+ get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ get.addFamily(FAMILY_NAME);
+ }
+ if (opts.filterAll) {
+ get.setFilter(new FilterAllFilter());
+ }
+ if (LOG.isTraceEnabled()) LOG.trace(get.toString());
+ if (opts.multiGet > 0) {
+ this.gets.add(get);
+ if (this.gets.size() == opts.multiGet) {
+ Result [] rs = this.table.get(this.gets);
+ updateValueSize(rs);
+ this.gets.clear();
}
+ } else {
+ updateValueSize(this.table.get(get));
}
}
@Override
protected int getReportingPeriod() {
- int period = opts.perClientRunRows / 100;
+ int period = opts.perClientRunRows / 10;
return period == 0 ? opts.perClientRunRows : period;
}
@@ -807,61 +1333,47 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.gets.clear();
}
super.testTakedown();
- if (opts.reportLatency) {
- Arrays.sort(times);
- DescriptiveStatistics ds = new DescriptiveStatistics();
- for (double t : times) {
- ds.addValue(t);
- }
- LOG.info("randomRead latency log (ms), on " + times.length + " measures");
- LOG.info("99.9999% = " + ds.getPercentile(99.9999d));
- LOG.info(" 99.999% = " + ds.getPercentile(99.999d));
- LOG.info(" 99.99% = " + ds.getPercentile(99.99d));
- LOG.info(" 99.9% = " + ds.getPercentile(99.9d));
- LOG.info(" 99% = " + ds.getPercentile(99d));
- LOG.info(" 95% = " + ds.getPercentile(95d));
- LOG.info(" 90% = " + ds.getPercentile(90d));
- LOG.info(" 80% = " + ds.getPercentile(80d));
- LOG.info("Standard Deviation = " + ds.getStandardDeviation());
- LOG.info("Mean = " + ds.getMean());
- }
}
}
- static class RandomWriteTest extends Test {
- RandomWriteTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ static class RandomWriteTest extends TableTest {
+ RandomWriteTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
byte[] row = getRandomRow(this.rand, opts.totalRows);
Put put = new Put(row);
- byte[] value = generateData(this.rand, VALUE_LENGTH);
- if (opts.useTags) {
- byte[] tag = generateData(this.rand, TAG_LENGTH);
- Tag[] tags = new Tag[opts.noOfTags];
- for (int n = 0; n < opts.noOfTags; n++) {
- Tag t = new Tag((byte) n, tag);
- tags[n] = t;
+ for (int column = 0; column < opts.columns; column++) {
+ byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+ byte[] value = generateData(this.rand, getValueLength(this.rand));
+ if (opts.useTags) {
+ byte[] tag = generateData(this.rand, TAG_LENGTH);
+ Tag[] tags = new Tag[opts.noOfTags];
+ for (int n = 0; n < opts.noOfTags; n++) {
+ Tag t = new Tag((byte) n, tag);
+ tags[n] = t;
+ }
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
+ updateValueSize(kv.getValueLength());
+ } else {
+ put.add(FAMILY_NAME, qualifier, value);
+ updateValueSize(value.length);
}
- KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
- value, tags);
- put.add(kv);
- } else {
- put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
}
}
-
- static class ScanTest extends Test {
+ static class ScanTest extends TableTest {
private ResultScanner testScanner;
- ScanTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ ScanTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
@@ -877,23 +1389,135 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException {
if (this.testScanner == null) {
Scan scan = new Scan(format(opts.startRow));
- scan.setCaching(30);
+ scan.setCaching(opts.caching);
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
if (opts.filterAll) {
scan.setFilter(new FilterAllFilter());
}
this.testScanner = table.getScanner(scan);
}
- testScanner.next();
+ Result r = testScanner.next();
+ updateValueSize(r);
+ }
+ }
+
+ /**
+ * Base class for operations that are CAS-like; that read a value and then set it based off what
+ * they read. In this category is increment, append, checkAndPut, etc.
+ *
+ * <p>These operations also want some concurrency going on. Usually when these tests run, they
+ * operate in their own part of the key range. In CASTest, we will have them all overlap on the
+ * same key space. We do this with our getStartRow and getLastRow overrides.
+ */
+ static abstract class CASTableTest extends TableTest {
+ private final byte [] qualifier;
+ CASTableTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
+ qualifier = Bytes.toBytes(this.getClass().getSimpleName());
+ }
+
+ byte [] getQualifier() {
+ return this.qualifier;
}
+ @Override
+ int getStartRow() {
+ return 0;
+ }
+
+ @Override
+ int getLastRow() {
+ return opts.perClientRunRows;
+ }
+ }
+
+ static class IncrementTest extends CASTableTest {
+ IncrementTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void testRow(final int i) throws IOException {
+ Increment increment = new Increment(format(i));
+ increment.addColumn(FAMILY_NAME, getQualifier(), 1l);
+ updateValueSize(this.table.increment(increment));
+ }
}
- static class SequentialReadTest extends Test {
- SequentialReadTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ static class AppendTest extends CASTableTest {
+ AppendTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void testRow(final int i) throws IOException {
+ byte [] bytes = format(i);
+ Append append = new Append(bytes);
+ append.add(FAMILY_NAME, getQualifier(), bytes);
+ updateValueSize(this.table.append(append));
+ }
+ }
+
+ static class CheckAndMutateTest extends CASTableTest {
+ CheckAndMutateTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void testRow(final int i) throws IOException {
+ byte [] bytes = format(i);
+ // Put a known value so when we go to check it, it is there.
+ Put put = new Put(bytes);
+ put.add(FAMILY_NAME, getQualifier(), bytes);
+ this.table.put(put);
+ RowMutations mutations = new RowMutations(bytes);
+ mutations.add(put);
+ this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes,
+ mutations);
+ }
+ }
+
+ static class CheckAndPutTest extends CASTableTest {
+ CheckAndPutTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void testRow(final int i) throws IOException {
+ byte [] bytes = format(i);
+ // Put a known value so when we go to check it, it is there.
+ Put put = new Put(bytes);
+ put.add(FAMILY_NAME, getQualifier(), bytes);
+ this.table.put(put);
+ this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), bytes, put);
+ }
+ }
+
+ static class CheckAndDeleteTest extends CASTableTest {
+ CheckAndDeleteTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
+ }
+
+ @Override
+ void testRow(final int i) throws IOException {
+ byte [] bytes = format(i);
+ // Put a known value so when we go to check it, it is there.
+ Put put = new Put(bytes);
+ put.add(FAMILY_NAME, getQualifier(), bytes);
+ this.table.put(put);
+ Delete delete = new Delete(put.getRow());
+ delete.deleteColumn(FAMILY_NAME, getQualifier());
+ this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), bytes, delete);
+ }
+ }
+
+ static class SequentialReadTest extends TableTest {
+ SequentialReadTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
@@ -905,53 +1529,59 @@ public class PerformanceEvaluation extends Configured implements Tool {
if (opts.filterAll) {
get.setFilter(new FilterAllFilter());
}
- table.get(get);
+ updateValueSize(table.get(get));
}
}
- static class SequentialWriteTest extends Test {
- SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ static class SequentialWriteTest extends TableTest {
+ SequentialWriteTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
void testRow(final int i) throws IOException {
byte[] row = format(i);
Put put = new Put(row);
- byte[] value = generateData(this.rand, VALUE_LENGTH);
- if (opts.useTags) {
- byte[] tag = generateData(this.rand, TAG_LENGTH);
- Tag[] tags = new Tag[opts.noOfTags];
- for (int n = 0; n < opts.noOfTags; n++) {
- Tag t = new Tag((byte) n, tag);
- tags[n] = t;
+ for (int column = 0; column < opts.columns; column++) {
+ byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+ byte[] value = generateData(this.rand, getValueLength(this.rand));
+ if (opts.useTags) {
+ byte[] tag = generateData(this.rand, TAG_LENGTH);
+ Tag[] tags = new Tag[opts.noOfTags];
+ for (int n = 0; n < opts.noOfTags; n++) {
+ Tag t = new Tag((byte) n, tag);
+ tags[n] = t;
+ }
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
+ updateValueSize(kv.getValueLength());
+ } else {
+ put.add(FAMILY_NAME, qualifier, value);
+ updateValueSize(value.length);
}
- KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
- value, tags);
- put.add(kv);
- } else {
- put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
}
}
- static class FilteredScanTest extends Test {
+ static class FilteredScanTest extends TableTest {
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
- FilteredScanTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
+ FilteredScanTest(HConnection con, TestOptions options, Status status) {
+ super(con, options, status);
}
@Override
void testRow(int i) throws IOException {
- byte[] value = generateData(this.rand, VALUE_LENGTH);
+ byte[] value = generateData(this.rand, getValueLength(this.rand));
Scan scan = constructScan(value);
ResultScanner scanner = null;
try {
scanner = this.table.getScanner(scan);
- while (scanner.next() != null) {
+ for (Result r = null; (r = scanner.next()) != null;) {
+ updateValueSize(r);
}
} finally {
if (scanner != null) scanner.close();
@@ -961,7 +1591,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected Scan constructScan(byte[] valuePrefix) throws IOException {
FilterList list = new FilterList();
Filter filter = new SingleColumnValueFilter(
- FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
+ FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
new BinaryComparator(valuePrefix)
);
list.addFilter(filter);
@@ -969,8 +1599,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
list.addFilter(new FilterAllFilter());
}
Scan scan = new Scan();
+ scan.setCaching(opts.caching);
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
scan.setFilter(list);
return scan;
@@ -983,11 +1616,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param timeMs Time taken in milliseconds.
* @return String value with label, ie '123.76 MB/s'
*/
- private static String calculateMbps(int rows, long timeMs) {
- // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
- // * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
- BigDecimal rowSize =
- BigDecimal.valueOf(ROW_LENGTH + VALUE_LENGTH + FAMILY_NAME.length + QUALIFIER_NAME.length);
+ private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
+ BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
+ ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
.divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
.divide(BYTES_PER_MB, CXT);
@@ -1018,7 +1649,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
*/
public static byte[] generateData(final Random r, int length) {
byte [] b = new byte [length];
- int i = 0;
+ int i;
for(i = 0; i < (length-8); i += 8) {
b[i] = (byte) (65 + r.nextInt(26));
@@ -1044,25 +1675,25 @@ public class PerformanceEvaluation extends Configured implements Tool {
*/
@Deprecated
public static byte[] generateValue(final Random r) {
- return generateData(r, VALUE_LENGTH);
+ return generateData(r, DEFAULT_VALUE_LENGTH);
}
static byte [] getRandomRow(final Random random, final int totalRows) {
return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
}
- static long runOneClient(final Class<? extends Test> cmd, Configuration conf, TestOptions opts,
- final Status status)
- throws IOException {
+ static RunResult runOneClient(final Class<? extends Test> cmd, Configuration conf, HConnection con,
+ TestOptions opts, final Status status)
+ throws IOException, InterruptedException {
status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
opts.perClientRunRows + " rows");
- long totalElapsedTime = 0;
+ long totalElapsedTime;
final Test t;
try {
Constructor<? extends Test> constructor =
- cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
- t = constructor.newInstance(conf, opts, status);
+ cmd.getDeclaredConstructor(HConnection.class, TestOptions.class, Status.class);
+ t = constructor.newInstance(con, opts, status);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Invalid command class: " +
cmd.getName() + ". It does not provide a constructor as described by " +
@@ -1075,50 +1706,74 @@ public class PerformanceEvaluation extends Configured implements Tool {
status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
"ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
- " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")");
- return totalElapsedTime;
+ " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
+ getAverageValueLength(opts), opts.columns) + ")");
+
+ return new RunResult(totalElapsedTime, t.getLatency());
+ }
+
+ private static int getAverageValueLength(final TestOptions opts) {
+ return opts.valueRandom? opts.valueSize/2: opts.valueSize;
}
private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
InterruptedException, ClassNotFoundException {
- HBaseAdmin admin = null;
+ // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
+ // the TestOptions introspection for us and dump the output in a readable format.
+ LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts));
+ HBaseAdmin admin = new HBaseAdmin(getConf());
try {
- admin = new HBaseAdmin(getConf());
checkTable(admin, opts);
} finally {
- if (admin != null) admin.close();
+ admin.close();
}
+
if (opts.nomapred) {
- doLocalClients(cmd, opts);
+ doLocalClients(opts, getConf());
} else {
- doMapReduce(cmd, opts);
+ doMapReduce(opts, getConf());
}
}
protected void printUsage() {
- printUsage(null);
+ printUsage(this.getClass().getName(), null);
}
- protected void printUsage(final String message) {
+ protected static void printUsage(final String message) {
+ printUsage(PerformanceEvaluation.class.getName(), message);
+ }
+
+ protected static void printUsageAndExit(final String message, final int exitCode) {
+ printUsage(message);
+ System.exit(exitCode);
+ }
+
+ protected static void printUsage(final String className, final String message) {
if (message != null && message.length() > 0) {
System.err.println(message);
}
- System.err.println("Usage: java " + this.getClass().getName() + " \\");
- System.err.println(" [--nomapred] [--rows=ROWS] [--table=NAME] \\");
- System.err.println(" [--compress=TYPE] [--blockEncoding=TYPE] " +
- "[-D<property=value>]* <command> <nclients>");
+ System.err.println("Usage: java " + className + " \\");
+ System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
System.err.println();
System.err.println("Options:");
System.err.println(" nomapred Run multiple clients using threads " +
"(rather than use mapreduce)");
System.err.println(" rows Rows each client runs. Default: One million");
+ System.err.println(" size Total size in GiB. Mutually exclusive with --rows. " +
+ "Default: 1.0.");
System.err.println(" sampleRate Execute test on a sample of total " +
"rows. Only supported by randomRead. Default: 1.0");
+ System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
+ "Default: 0");
System.err.println(" table Alternate table name. Default: 'TestTable'");
+ System.err.println(" multiGet If >0, when doing RandomRead, perform multiple gets " +
+ "instead of single gets. Default: 0");
System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
System.err.println(" flushCommits Used to determine if the test should flush the table. " +
"Default: false");
System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
+ System.err.println(" autoFlush Set autoFlush on htable. Default: False");
+ System.err.println(" oneCon all the threads share the same connection. Default: False");
System.err.println(" presplit Create presplit table. Recommended for accurate perf " +
"analysis (see guide). Default: disabled");
System.err.println(" inmemory Tries to keep the HFiles of the CF " +
@@ -1131,18 +1786,30 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" filterAll Helps to filter out all the rows on the server side"
+ " there by not returning any thing back to the client. Helps to check the server side"
+ " performance. Uses FilterAllFilter internally. ");
- System.err.println(" latency Set to report operation latencies. " +
- "Currently only supported by randomRead test. Default: False");
+ System.err.println(" latency Set to report operation latencies. Default: False");
System.err.println(" bloomFilter Bloom filter type, one of " + Arrays.toString(BloomType.values()));
+ System.err.println(" valueSize Pass value size to use: Default: 1024");
+ System.err.println(" valueRandom Set if we should vary value size between 0 and " +
+ "'valueSize'; set on read for stats on size: Default: Not set.");
+ System.err.println(" valueZipf Set if we should vary value size between 0 and " +
+ "'valueSize' in zipf form: Default: Not set.");
+ System.err.println(" period Report every 'period' rows: " +
+ "Default: opts.perClientRunRows / 10");
+ System.err.println(" multiGet Batch gets together into groups of N. Only supported " +
+ "by randomRead. Default: disabled");
+ System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
+ System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
+ System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
+ System.err.println(" columns Columns to write per row. Default: 1");
+ System.err.println(" caching Scan caching to use. Default: 30");
System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: ");
- System.err.println(" -Dmapred.output.compress=true");
- System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
+ System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true");
System.err.println(" -Dmapreduce.task.timeout=60000");
System.err.println();
System.err.println("Command:");
- for (CmdDescriptor command : commands.values()) {
+ for (CmdDescriptor command : COMMANDS.values()) {
System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
}
System.err.println();
@@ -1151,163 +1818,282 @@ public class PerformanceEvaluation extends Configured implements Tool {
"clients (and HRegionServers)");
System.err.println(" running: 1 <= value <= 500");
System.err.println("Examples:");
- System.err.println(" To run a single evaluation client:");
- System.err.println(" $ bin/hbase " + this.getClass().getName()
- + " sequentialWrite 1");
+ System.err.println(" To run a single client doing the default 1M sequentialWrites:");
+ System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
+ System.err.println(" To run 10 clients doing increments over ten rows:");
+ System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10");
}
- private static int getNumClients(final int start, final String[] args) {
- if(start + 1 > args.length) {
- throw new IllegalArgumentException("must supply the number of clients");
- }
- int N = Integer.parseInt(args[start]);
- if (N < 1) {
- throw new IllegalArgumentException("Number of clients must be > 1");
- }
- return N;
- }
+ /**
+ * Parse options passed in via an arguments array. Assumes that array has been split
+ * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
+ * in the queue at the conclusion of this method call. It's up to the caller to deal
+ * with these unrecognized arguments.
+ */
+ static TestOptions parseOpts(Queue<String> args) {
+ TestOptions opts = new TestOptions();
+
+ String cmd = null;
+ while ((cmd = args.poll()) != null) {
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ // place item back onto queue so that caller knows parsing was incomplete
+ args.add(cmd);
+ break;
+ }
- public int run(String[] args) throws Exception {
- // Process command-line args. TODO: Better cmd-line processing
- // (but hopefully something not as painful as cli options).
- int errCode = -1;
- if (args.length < 1) {
- printUsage();
- return errCode;
- }
+ final String nmr = "--nomapred";
+ if (cmd.startsWith(nmr)) {
+ opts.nomapred = true;
+ continue;
+ }
- try {
- // MR-NOTE: if you are adding a property that is used to control an operation
- // like put(), get(), scan(), ... you must also add it as part of the MR
- // input, take a look at writeInputFile().
- // Then you must adapt the LINE_PATTERN input regex,
- // and parse the argument, take a look at PEInputFormat.getSplits().
-
- TestOptions opts = new TestOptions();
-
- for (int i = 0; i < args.length; i++) {
- String cmd = args[i];
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
- printUsage();
- errCode = 0;
- break;
- }
+ final String rows = "--rows=";
+ if (cmd.startsWith(rows)) {
+ opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
+ continue;
+ }
- final String nmr = "--nomapred";
- if (cmd.startsWith(nmr)) {
- opts.nomapred = true;
- continue;
- }
+ final String sampleRate = "--sampleRate=";
+ if (cmd.startsWith(sampleRate)) {
+ opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+ continue;
+ }
- final String rows = "--rows=";
- if (cmd.startsWith(rows)) {
- opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
- continue;
- }
+ final String table = "--table=";
+ if (cmd.startsWith(table)) {
+ opts.tableName = cmd.substring(table.length());
+ continue;
+ }
- final String sampleRate = "--sampleRate=";
- if (cmd.startsWith(sampleRate)) {
- opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
- continue;
- }
+ final String startRow = "--startRow=";
+ if (cmd.startsWith(startRow)) {
+ opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
+ continue;
+ }
- final String table = "--table=";
- if (cmd.startsWith(table)) {
- opts.tableName = cmd.substring(table.length());
- continue;
- }
+ final String compress = "--compress=";
+ if (cmd.startsWith(compress)) {
+ opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+ continue;
+ }
- final String compress = "--compress=";
- if (cmd.startsWith(compress)) {
- opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
- continue;
- }
+ final String traceRate = "--traceRate=";
+ if (cmd.startsWith(traceRate)) {
+ opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
+ continue;
+ }
- final String blockEncoding = "--blockEncoding=";
- if (cmd.startsWith(blockEncoding)) {
- opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
- continue;
- }
+ final String blockEncoding = "--blockEncoding=";
+ if (cmd.startsWith(blockEncoding)) {
+ opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+ continue;
+ }
- final String flushCommits = "--flushCommits=";
- if (cmd.startsWith(flushCommits)) {
- opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
- continue;
- }
+ final String flushCommits = "--flushCommits=";
+ if (cmd.startsWith(flushCommits)) {
+ opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+ continue;
+ }
- final String writeToWAL = "--writeToWAL=";
- if (cmd.startsWith(writeToWAL)) {
- opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
- continue;
- }
+ final String writeToWAL = "--writeToWAL=";
+ if (cmd.startsWith(writeToWAL)) {
+ opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+ continue;
+ }
- final String presplit = "--presplit=";
- if (cmd.startsWith(presplit)) {
- opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
- continue;
- }
+ final String presplit = "--presplit=";
+ if (cmd.startsWith(presplit)) {
+ opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+ continue;
+ }
- final String inMemory = "--inmemory=";
- if (cmd.startsWith(inMemory)) {
- opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
- continue;
- }
+ final String inMemory = "--inmemory=";
+ if (cmd.startsWith(inMemory)) {
+ opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+ continue;
+ }
- final String latency = "--latency";
- if (cmd.startsWith(latency)) {
- opts.reportLatency = true;
- continue;
- }
+ final String autoFlush = "--autoFlush=";
+ if (cmd.startsWith(autoFlush)) {
+ opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
+ continue;
+ }
- final String multiGet = "--multiGet=";
- if (cmd.startsWith(multiGet)) {
- opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
- continue;
- }
+ final String onceCon = "--oneCon=";
+ if (cmd.startsWith(onceCon)) {
+ opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
+ continue;
+ }
- final String useTags = "--usetags=";
- if (cmd.startsWith(useTags)) {
- opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
- continue;
- }
-
- final String noOfTags = "--numoftags=";
- if (cmd.startsWith(noOfTags)) {
- opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
- continue;
- }
+ final String latency = "--latency";
+ if (cmd.startsWith(latency)) {
+ opts.reportLatency = true;
+ continue;
+ }
- final String filterOutAll = "--filterAll";
- if (cmd.startsWith(filterOutAll)) {
- opts.filterAll = true;
- continue;
- }
+ final String multiGet = "--multiGet=";
+ if (cmd.startsWith(multiGet)) {
+ opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+ continue;
+ }
+
+ final String useTags = "--usetags=";
+ if (cmd.startsWith(useTags)) {
+ opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+ continue;
+ }
+
+ final String noOfTags = "--numoftags=";
+ if (cmd.startsWith(noOfTags)) {
+ opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+ continue;
+ }
+
+ final String filterOutAll = "--filterAll";
+ if (cmd.startsWith(filterOutAll)) {
+ opts.filterAll = true;
+ continue;
+ }
+
+ final String size = "--size=";
+ if (cmd.startsWith(size)) {
+ opts.size = Float.parseFloat(cmd.substring(size.length()));
+ continue;
+ }
+
+ final String splitPolicy = "--splitPolicy=";
+ if (cmd.startsWith(splitPolicy)) {
+ opts.splitPolicy = cmd.substring(splitPolicy.length());
+ continue;
+ }
+
+ final String randomSleep = "--randomSleep=";
+ if (cmd.startsWith(randomSleep)) {
+ opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
+ continue;
+ }
- final String bloomFilter = "--bloomFilter";
- if (cmd.startsWith(bloomFilter)) {
- opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
- continue;
+ final String bloomFilter = "--bloomFilter=";
+ if (cmd.startsWith(bloomFilter)) {
+ opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
+ continue;
+ }
+
+ final String valueSize = "--valueSize=";
+ if (cmd.startsWith(valueSize)) {
+ opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
+ continue;
+ }
+
+ final String valueRandom = "--valueRandom";
+ if (cmd.startsWith(valueRandom)) {
+ opts.valueRandom = true;
+ if (opts.valueZipf) {
+ throw new IllegalStateException("Either valueZipf or valueRandom but not both");
}
+ continue;
+ }
- final String addColumns = "--addColumns=";
- if (cmd.startsWith(addColumns)) {
- opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
- continue;
+ final String valueZipf = "--valueZipf";
+ if (cmd.startsWith(valueZipf)) {
+ opts.valueZipf = true;
+ if (opts.valueRandom) {
+ throw new IllegalStateException("Either valueZipf or valueRandom but not both");
}
+ continue;
+ }
+
+ final String period = "--period=";
+ if (cmd.startsWith(period)) {
+ opts.period = Integer.parseInt(cmd.substring(period.length()));
+ continue;
+ }
+
+ final String addColumns = "--addColumns=";
+ if (cmd.startsWith(addColumns)) {
+ opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
+ continue;
+ }
+
+ final String columns = "--columns=";
+ if (cmd.startsWith(columns)) {
+ opts.columns = Integer.parseInt(cmd.substring(columns.length()));
+ continue;
+ }
+
+ final String caching = "--caching=";
+ if (cmd.startsWith(caching)) {
+ opts.caching = Integer.parseInt(cmd.substring(caching.length()));
+ continue;
+ }
- Class<? extends Test> cmdClass = determineCommandClass(cmd);
- if (cmdClass != null) {
- opts.numClientThreads = getNumClients(i + 1, args);
+ if (isCommandClass(cmd)) {
+ opts.cmdName = cmd;
+ opts.numClientThreads = Integer.parseInt(args.remove());
+ int rowsPerGB = getRowsPerGB(opts);
+ if (opts.size != DEFAULT_OPTS.size &&
+ opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
+ throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
+ }
+ if (opts.size != DEFAULT_OPTS.size) {
+ // total size in GB specified
+ opts.totalRows = (int) opts.size * rowsPerGB;
+ opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
+ } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
// number of rows specified
opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
- runTest(cmdClass, opts);
- errCode = 0;
- break;
+ opts.size = opts.totalRows / rowsPerGB;
}
+ break;
+ } else {
+ printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
+ }
+
+ // Not matching any option or command.
+ System.err.println("Error: Wrong option or command: " + cmd);
+ args.add(cmd);
+ break;
+ }
+ return opts;
+ }
+
+ static int getRowsPerGB(final TestOptions opts) {
+ return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ // Process command-line args. TODO: Better cmd-line processing
+ // (but hopefully something not as painful as cli options).
+ int errCode = -1;
+ if (args.length < 1) {
+ printUsage();
+ return errCode;
+ }
+ try {
+ LinkedList<String> argv = new LinkedList<String>();
+ argv.addAll(Arrays.asList(args));
+ TestOptions opts = parseOpts(argv);
+
+ // args remaining, print help and exit
+ if (!argv.isEmpty()) {
+ errCode = 0;
printUsage();
- break;
+ return errCode;
}
+
+ // must run at least 1 client
+ if (opts.numClientThreads <= 0) {
+ throw new IllegalArgumentException("Number of clients must be > 0");
+ }
+
+ Class<? extends Test> cmdClass = determineCommandClass(opts.cmdName);
+ if (cmdClass != null) {
+ runTest(cmdClass, opts);
+ errCode = 0;
+ }
+
} catch (Exception e) {
e.printStackTrace();
}
@@ -1315,8 +2101,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
return errCode;
}
- private Class<? extends Test> determineCommandClass(String cmd) {
- CmdDescriptor descriptor = commands.get(cmd);
+ private static boolean isCommandClass(String cmd) {
+ return COMMANDS.containsKey(cmd);
+ }
+
+ private static Class<? extends Test> determineCommandClass(String cmd) {
+ CmdDescriptor descriptor = COMMANDS.get(cmd);
return descriptor != null ? descriptor.getCmdClass() : null;
}