You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2014/02/26 22:59:59 UTC
svn commit: r1572289 -
/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Author: ndimiduk
Date: Wed Feb 26 21:59:58 2014
New Revision: 1572289
URL: http://svn.apache.org/r1572289
Log:
HBASE-10592 Refactor PerformanceEvaluation tool
Modified:
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1572289&r1=1572288&r2=1572289&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Wed Feb 26 21:59:58 2014
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
@@ -30,20 +28,21 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Durability;
@@ -70,21 +69,17 @@ import org.apache.hadoop.hbase.util.Hash
import org.apache.hadoop.hbase.util.MurmurHash;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
-import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY;
/**
* Script used evaluating HBase performance and scalability. Runs a HBase
@@ -105,7 +100,7 @@ import org.apache.hadoop.util.ToolRunner
public class PerformanceEvaluation extends Configured implements Tool {
protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
- public static final TableName TABLE_NAME = TableName.valueOf("TestTable");
+ public static final String TABLE_NAME = "TestTable";
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
public static final int VALUE_LENGTH = 1000;
@@ -119,44 +114,12 @@ public class PerformanceEvaluation exten
private static final MathContext CXT = MathContext.DECIMAL64;
private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
+ private static final TestOptions DEFAULT_OPTS = new TestOptions();
- protected HTableDescriptor TABLE_DESCRIPTOR;
protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
- private boolean nomapred = false;
- private int N = 1;
- private int R = ROWS_PER_GB;
- private float sampleRate = 1.0f;
- private TableName tableName = TABLE_NAME;
- private Compression.Algorithm compression = Compression.Algorithm.NONE;
- private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
- private boolean flushCommits = true;
- private boolean writeToWAL = true;
- private boolean inMemoryCF = false;
- private boolean reportLatency = false;
- private int presplitRegions = 0;
- private boolean useTags = false;
- private int noOfTags = 1;
- private int multiGet = 0;
- private HConnection connection;
-
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
- /** Regex to parse lines in input file passed to mapreduce task. */
- public static final Pattern LINE_PATTERN =
- Pattern.compile("tableName=(\\w+),\\s+" +
- "startRow=(\\d+),\\s+" +
- "perClientRunRows=(\\d+),\\s+" +
- "totalRows=(\\d+),\\s+" +
- "sampleRate=([-+]?[0-9]*\\.?[0-9]+),\\s+" +
- "clients=(\\d+),\\s+" +
- "flushCommits=(\\w+),\\s+" +
- "writeToWAL=(\\w+),\\s+" +
- "useTags=(\\w+),\\s+" +
- "noOfTags=(\\d+),\\s+" +
- "reportLatency=(\\w+),\\s+" +
- "multiGet=(\\d+)");
-
/**
* Enum for map metrics. Keep it out here rather than inside in the Map
* inner-class so we can find associated properties.
@@ -219,272 +182,10 @@ public class PerformanceEvaluation exten
}
/**
- * This class works as the InputSplit of Performance Evaluation
- * MapReduce InputFormat, and the Record Value of RecordReader.
- * Each map task will only read one record from a PeInputSplit,
- * the record value is the PeInputSplit itself.
- */
- public static class PeInputSplit extends InputSplit implements Writable {
- private TableName tableName = TABLE_NAME;
- private int startRow = 0;
- private int rows = 0;
- private int totalRows = 0;
- private float sampleRate = 1.0f;
- private int clients = 0;
- private boolean flushCommits = false;
- private boolean writeToWAL = true;
- private boolean useTags = false;
- private int noOfTags = 0;
- private boolean reportLatency = false;
- private int multiGet = 0;
-
- public PeInputSplit() {}
-
- public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows,
- float sampleRate, int clients, boolean flushCommits, boolean writeToWAL,
- boolean useTags, int noOfTags, boolean reportLatency, int multiGet) {
- this.tableName = tableName;
- this.startRow = startRow;
- this.rows = rows;
- this.totalRows = totalRows;
- this.sampleRate = sampleRate;
- this.clients = clients;
- this.flushCommits = flushCommits;
- this.writeToWAL = writeToWAL;
- this.useTags = useTags;
- this.noOfTags = noOfTags;
- this.reportLatency = reportLatency;
- this.multiGet = multiGet;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int tableNameLen = in.readInt();
- byte[] name = new byte[tableNameLen];
- in.readFully(name);
- this.tableName = TableName.valueOf(name);
-
- this.startRow = in.readInt();
- this.rows = in.readInt();
- this.totalRows = in.readInt();
- this.sampleRate = in.readFloat();
- this.clients = in.readInt();
- this.flushCommits = in.readBoolean();
- this.writeToWAL = in.readBoolean();
- this.useTags = in.readBoolean();
- this.noOfTags = in.readInt();
- this.reportLatency = in.readBoolean();
- this.multiGet = in.readInt();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- byte[] name = this.tableName.toBytes();
- out.writeInt(name.length);
- out.write(name);
- out.writeInt(startRow);
- out.writeInt(rows);
- out.writeInt(totalRows);
- out.writeFloat(sampleRate);
- out.writeInt(clients);
- out.writeBoolean(flushCommits);
- out.writeBoolean(writeToWAL);
- out.writeBoolean(useTags);
- out.writeInt(noOfTags);
- out.writeBoolean(reportLatency);
- out.writeInt(multiGet);
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[0];
- }
-
- public TableName getTableName() {
- return tableName;
- }
-
- public int getStartRow() {
- return startRow;
- }
-
- public int getRows() {
- return rows;
- }
-
- public int getTotalRows() {
- return totalRows;
- }
-
- public float getSampleRate() {
- return sampleRate;
- }
-
- public int getClients() {
- return clients;
- }
-
- public boolean isFlushCommits() {
- return flushCommits;
- }
-
- public boolean isWriteToWAL() {
- return writeToWAL;
- }
-
- public boolean isUseTags() {
- return useTags;
- }
-
- public int getNoOfTags() {
- return noOfTags;
- }
-
- public boolean isReportLatency() {
- return reportLatency;
- }
-
- public int getMultiGet() {
- return multiGet;
- }
- }
-
- /**
- * InputFormat of Performance Evaluation MapReduce job.
- * It extends from FileInputFormat, want to use it's methods such as setInputPaths().
- */
- public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
-
- @Override
- public List<InputSplit> getSplits(JobContext job) throws IOException {
- // generate splits
- List<InputSplit> splitList = new ArrayList<InputSplit>();
-
- for (FileStatus file: listStatus(job)) {
- if (file.isDir()) {
- continue;
- }
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- FSDataInputStream fileIn = fs.open(path);
- LineReader in = new LineReader(fileIn, job.getConfiguration());
- int lineLen = 0;
- while(true) {
- Text lineText = new Text();
- lineLen = in.readLine(lineText);
- if(lineLen <= 0) {
- break;
- }
- Matcher m = LINE_PATTERN.matcher(lineText.toString());
- if((m != null) && m.matches()) {
- TableName tableName = TableName.valueOf(m.group(1));
- int startRow = Integer.parseInt(m.group(2));
- int rows = Integer.parseInt(m.group(3));
- int totalRows = Integer.parseInt(m.group(4));
- float sampleRate = Float.parseFloat(m.group(5));
- int clients = Integer.parseInt(m.group(6));
- boolean flushCommits = Boolean.parseBoolean(m.group(7));
- boolean writeToWAL = Boolean.parseBoolean(m.group(8));
- boolean useTags = Boolean.parseBoolean(m.group(9));
- int noOfTags = Integer.parseInt(m.group(10));
- boolean reportLatency = Boolean.parseBoolean(m.group(11));
- int multiGet = Integer.parseInt(m.group(12));
-
- LOG.debug("tableName=" + tableName +
- " split["+ splitList.size() + "] " +
- " startRow=" + startRow +
- " rows=" + rows +
- " totalRows=" + totalRows +
- " sampleRate=" + sampleRate +
- " clients=" + clients +
- " flushCommits=" + flushCommits +
- " writeToWAL=" + writeToWAL +
- " useTags=" + useTags +
- " noOfTags=" + noOfTags +
- " reportLatency=" + reportLatency +
- " multiGet=" + multiGet);
-
- PeInputSplit newSplit =
- new PeInputSplit(tableName, startRow, rows, totalRows, sampleRate, clients,
- flushCommits, writeToWAL, useTags, noOfTags, reportLatency, multiGet);
- splitList.add(newSplit);
- }
- }
- in.close();
- }
-
- LOG.info("Total # of splits: " + splitList.size());
- return splitList;
- }
-
- @Override
- public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
- TaskAttemptContext context) {
- return new PeRecordReader();
- }
-
- public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
- private boolean readOver = false;
- private PeInputSplit split = null;
- private NullWritable key = null;
- private PeInputSplit value = null;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- this.readOver = false;
- this.split = (PeInputSplit)split;
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if(readOver) {
- return false;
- }
-
- key = NullWritable.get();
- value = split;
-
- readOver = true;
- return true;
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
-
- @Override
- public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- if(readOver) {
- return 1.0f;
- } else {
- return 0.0f;
- }
- }
-
- @Override
- public void close() throws IOException {
- // do nothing
- }
- }
- }
-
- /**
* MapReduce job that runs a performance evaluation client in each map task.
*/
public static class EvaluationMapTask
- extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
+ extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
/** configuration parameter name that contains the command */
public final static String CMD_KEY = "EvaluationMapTask.command";
@@ -511,16 +212,14 @@ public class PerformanceEvaluation exten
}
private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
- Class<? extends Type> clazz = null;
try {
- clazz = Class.forName(className).asSubclass(type);
+ return Class.forName(className).asSubclass(type);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Could not find class for name: " + className, e);
}
- return clazz;
}
- protected void map(NullWritable key, PeInputSplit value, final Context context)
+ protected void map(LongWritable key, Text value, final Context context)
throws IOException, InterruptedException {
Status status = new Status() {
@@ -529,18 +228,17 @@ public class PerformanceEvaluation exten
}
};
+ ObjectMapper mapper = new ObjectMapper();
+ TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
+ Configuration conf = HBaseConfiguration.create(context.getConfiguration());
+
// Evaluation task
- pe.tableName = value.getTableName();
- long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
- value.getRows(), value.getTotalRows(), value.getSampleRate(),
- value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(),
- value.getNoOfTags(), value.isReportLatency(), value.getMultiGet(),
- HConnectionManager.createConnection(context.getConfiguration()), status);
+ long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status);
// Collect how much time the thing took. Report as map output and
// to the ELAPSED_TIME counter.
context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
- context.getCounter(Counter.ROWS).increment(value.rows);
- context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
+ context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
+ context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime));
context.progress();
}
}
@@ -551,21 +249,21 @@ public class PerformanceEvaluation exten
* @return True if we created the table.
* @throws IOException
*/
- private boolean checkTable(HBaseAdmin admin) throws IOException {
- HTableDescriptor tableDescriptor = getTableDescriptor();
- if (this.presplitRegions > 0) {
+ private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
+ HTableDescriptor tableDescriptor = getTableDescriptor(opts);
+ if (opts.presplitRegions > 0) {
// presplit requested
if (admin.tableExists(tableDescriptor.getTableName())) {
admin.disableTable(tableDescriptor.getTableName());
admin.deleteTable(tableDescriptor.getTableName());
}
- byte[][] splits = getSplits();
+ byte[][] splits = getSplits(opts);
for (int i=0; i < splits.length; i++) {
LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
}
admin.createTable(tableDescriptor, splits);
- LOG.info ("Table created with " + this.presplitRegions + " splits");
+ LOG.info ("Table created with " + opts.presplitRegions + " splits");
}
else {
boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
@@ -577,33 +275,32 @@ public class PerformanceEvaluation exten
return admin.tableExists(tableDescriptor.getTableName());
}
- protected HTableDescriptor getTableDescriptor() {
- if (TABLE_DESCRIPTOR == null) {
- TABLE_DESCRIPTOR = new HTableDescriptor(tableName);
- HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
- family.setDataBlockEncoding(blockEncoding);
- family.setCompressionType(compression);
- if (inMemoryCF) {
- family.setInMemory(true);
- }
- TABLE_DESCRIPTOR.addFamily(family);
+ /**
+ * Create an HTableDescriptor from provided TestOptions.
+ */
+ protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
+ HTableDescriptor desc = new HTableDescriptor(opts.tableName);
+ HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
+ family.setDataBlockEncoding(opts.blockEncoding);
+ family.setCompressionType(opts.compression);
+ if (opts.inMemoryCF) {
+ family.setInMemory(true);
}
- return TABLE_DESCRIPTOR;
+ desc.addFamily(family);
+ return desc;
}
/**
* generates splits based on total number of rows and specified split regions
- *
- * @return splits : array of byte []
*/
- protected byte[][] getSplits() {
- if (this.presplitRegions == 0)
+ protected static byte[][] getSplits(TestOptions opts) {
+ if (opts.presplitRegions == 0)
return new byte [0][];
- int numSplitPoints = presplitRegions - 1;
+ int numSplitPoints = opts.presplitRegions - 1;
byte[][] splits = new byte[numSplitPoints][];
- int jump = this.R / this.presplitRegions;
- for (int i=0; i < numSplitPoints; i++) {
+ int jump = opts.totalRows / opts.presplitRegions;
+ for (int i = 0; i < numSplitPoints; i++) {
int rowkey = jump * (1 + i);
splits[i] = format(rowkey);
}
@@ -611,90 +308,40 @@ public class PerformanceEvaluation exten
}
/*
- * We're to run multiple clients concurrently. Setup a mapreduce job. Run
- * one map per client. Then run a single reduce to sum the elapsed times.
- * @param cmd Command to run.
- * @throws IOException
- */
- private void runNIsMoreThanOne(final Class<? extends Test> cmd)
- throws IOException, InterruptedException, ClassNotFoundException {
- checkTable(new HBaseAdmin(getConf()));
- if (this.nomapred) {
- doMultipleClients(cmd);
- } else {
- doMapReduce(cmd);
- }
- }
-
- /*
* Run all clients in this vm each to its own thread.
* @param cmd Command to run.
* @throws IOException
*/
- private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
- final List<Thread> threads = new ArrayList<Thread>(this.N);
- final long[] timings = new long[this.N];
- final int perClientRows = R/N;
- final float sampleRate = this.sampleRate;
- final TableName tableName = this.tableName;
- final DataBlockEncoding encoding = this.blockEncoding;
- final boolean flushCommits = this.flushCommits;
- final Compression.Algorithm compression = this.compression;
- final boolean writeToWal = this.writeToWAL;
- final boolean reportLatency = this.reportLatency;
- final int preSplitRegions = this.presplitRegions;
- final boolean useTags = this.useTags;
- final int numTags = this.noOfTags;
- final int multiGet = this.multiGet;
- final HConnection connection = HConnectionManager.createConnection(getConf());
- for (int i = 0; i < this.N; i++) {
+ private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
+ throws IOException, InterruptedException {
+ Future<Long>[] threads = new Future[opts.numClientThreads];
+ long[] timings = new long[opts.numClientThreads];
+ ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
+ new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
+ for (int i = 0; i < threads.length; i++) {
final int index = i;
- Thread t = new Thread ("TestClient-" + i) {
+ threads[i] = pool.submit(new Callable<Long>() {
@Override
- public void run() {
- super.run();
- PerformanceEvaluation pe = new PerformanceEvaluation(getConf());
- pe.tableName = tableName;
- pe.blockEncoding = encoding;
- pe.flushCommits = flushCommits;
- pe.compression = compression;
- pe.writeToWAL = writeToWal;
- pe.presplitRegions = preSplitRegions;
- pe.N = N;
- pe.sampleRate = sampleRate;
- pe.reportLatency = reportLatency;
- pe.connection = connection;
- pe.useTags = useTags;
- pe.noOfTags = numTags;
- pe.multiGet = multiGet;
- try {
- long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
- perClientRows, R, sampleRate, flushCommits, writeToWal, useTags,
- noOfTags, reportLatency, multiGet, connection, new Status() {
- public void setStatus(final String msg) throws IOException {
- LOG.info("client-" + getName() + " " + msg);
- }
- });
- timings[index] = elapsedTime;
- LOG.info("Finished " + getName() + " in " + elapsedTime +
- "ms writing " + perClientRows + " rows");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ public Long call() throws Exception {
+ TestOptions threadOpts = new TestOptions(opts);
+ threadOpts.startRow = index * threadOpts.perClientRunRows;
+ long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
+ public void setStatus(final String msg) throws IOException {
+ LOG.info("client-" + Thread.currentThread().getName() + " " + msg);
+ }
+ });
+ LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime +
+ "ms over " + threadOpts.perClientRunRows + " rows");
+ return elapsedTime;
}
- };
- threads.add(t);
+ });
}
- for (Thread t: threads) {
- t.start();
- }
- for (Thread t: threads) {
- while(t.isAlive()) {
- try {
- t.join();
- } catch (InterruptedException e) {
- LOG.debug("Interrupted, continuing" + e.toString());
- }
+ pool.shutdown();
+ for (int i = 0; i < threads.length; i++) {
+ try {
+ timings[i] = threads[i].get();
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
}
}
final String test = cmd.getSimpleName();
@@ -702,13 +349,13 @@ public class PerformanceEvaluation exten
+ Arrays.toString(timings));
Arrays.sort(timings);
long total = 0;
- for (int i = 0; i < this.N; i++) {
+ for (int i = 0; i < timings.length; i++) {
total += timings[i];
}
LOG.info("[" + test + "]"
+ "\tMin: " + timings[0] + "ms"
- + "\tMax: " + timings[this.N - 1] + "ms"
- + "\tAvg: " + (total / this.N) + "ms");
+ + "\tMax: " + timings[timings.length - 1] + "ms"
+ + "\tAvg: " + (total / timings.length) + "ms");
}
/*
@@ -718,18 +365,20 @@ public class PerformanceEvaluation exten
* @param cmd Command to run.
* @throws IOException
*/
- private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
+ private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
- Path inputDir = writeInputFile(conf);
+ Path inputDir = writeInputFile(conf, opts);
conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
Job job = new Job(conf);
job.setJarByClass(PerformanceEvaluation.class);
job.setJobName("HBase Performance Evaluation");
- job.setInputFormatClass(PeInputFormat.class);
- PeInputFormat.setInputPaths(job, inputDir);
+ job.setInputFormatClass(NLineInputFormat.class);
+ NLineInputFormat.setInputPaths(job, inputDir);
+ // this is default, but be explicit about it just in case.
+ NLineInputFormat.setNumLinesPerSplit(job, 1);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
@@ -743,9 +392,9 @@ public class PerformanceEvaluation exten
TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
TableMapReduceUtil.addDependencyJars(job);
- // Add a Class from the hbase.jar so it gets registered too.
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
- org.apache.hadoop.hbase.util.Bytes.class);
+ DescriptiveStatistics.class, // commons-math
+ ObjectMapper.class); // jackson-mapper-asl
TableMapReduceUtil.initCredentials(job);
@@ -758,7 +407,7 @@ public class PerformanceEvaluation exten
* @return Directory that contains file written.
* @throws IOException
*/
- private Path writeInputFile(final Configuration c) throws IOException {
+ private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
Path inputDir = new Path(jobdir, "inputs");
@@ -771,22 +420,16 @@ public class PerformanceEvaluation exten
// Make input random.
Map<Integer, String> m = new TreeMap<Integer, String>();
Hash h = MurmurHash.getInstance();
- int perClientRows = (this.R / this.N);
+ int perClientRows = (opts.totalRows / opts.numClientThreads);
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
try {
for (int i = 0; i < 10; i++) {
- for (int j = 0; j < N; j++) {
- String s = "tableName=" + this.tableName +
- ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
- ", perClientRunRows=" + (perClientRows / 10) +
- ", totalRows=" + this.R +
- ", sampleRate=" + this.sampleRate +
- ", clients=" + this.N +
- ", flushCommits=" + this.flushCommits +
- ", writeToWAL=" + this.writeToWAL +
- ", useTags=" + this.useTags +
- ", noOfTags=" + this.noOfTags +
- ", reportLatency=" + this.reportLatency +
- ", multiGet=" + this.multiGet;
+ for (int j = 0; j < opts.numClientThreads; j++) {
+ TestOptions next = new TestOptions(opts);
+ next.startRow = (j * perClientRows) + (i * (perClientRows/10));
+ next.perClientRunRows = perClientRows / 10;
+ String s = mapper.writeValueAsString(next);
int hash = h.hash(Bytes.toBytes(s));
m.put(hash, s);
}
@@ -828,95 +471,50 @@ public class PerformanceEvaluation exten
}
/**
- * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test
- * tests}. This makes the reflection logic a little easier to understand...
+ * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
+ * This makes tracking all these arguments a little easier.
*/
static class TestOptions {
- private int startRow;
- private int perClientRunRows;
- private int totalRows;
- private float sampleRate;
- private int numClientThreads;
- private TableName tableName;
- private boolean flushCommits;
- private boolean writeToWAL = true;
- private boolean useTags = false;
- private int noOfTags = 0;
- private boolean reportLatency;
- private int multiGet = 0;
- private HConnection connection;
-
- TestOptions() {}
- TestOptions(int startRow, int perClientRunRows, int totalRows, float sampleRate,
- int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL,
- boolean useTags, int noOfTags, boolean reportLatency, int multiGet,
- HConnection connection) {
- this.startRow = startRow;
- this.perClientRunRows = perClientRunRows;
- this.totalRows = totalRows;
- this.sampleRate = sampleRate;
- this.numClientThreads = numClientThreads;
- this.tableName = tableName;
- this.flushCommits = flushCommits;
- this.writeToWAL = writeToWAL;
- this.useTags = useTags;
- this.noOfTags = noOfTags;
- this.reportLatency = reportLatency;
- this.multiGet = multiGet;
- this.connection = connection;
- }
-
- public int getStartRow() {
- return startRow;
- }
+ public TestOptions() {}
- public int getPerClientRunRows() {
- return perClientRunRows;
- }
-
- public int getTotalRows() {
- return totalRows;
- }
-
- public float getSampleRate() {
- return sampleRate;
- }
-
- public int getNumClientThreads() {
- return numClientThreads;
- }
-
- public TableName getTableName() {
- return tableName;
- }
-
- public boolean isFlushCommits() {
- return flushCommits;
- }
-
- public boolean isWriteToWAL() {
- return writeToWAL;
- }
-
- public boolean isReportLatency() {
- return reportLatency;
- }
-
- public int getMultiGet() {
- return multiGet;
- }
-
- public HConnection getConnection() {
- return connection;
- }
-
- public boolean isUseTags() {
- return this.useTags;
- }
- public int getNumTags() {
- return this.noOfTags;
- }
+ public TestOptions(TestOptions that) {
+ this.nomapred = that.nomapred;
+ this.startRow = that.startRow;
+ this.perClientRunRows = that.perClientRunRows;
+ this.numClientThreads = that.numClientThreads;
+ this.totalRows = that.totalRows;
+ this.sampleRate = that.sampleRate;
+ this.tableName = that.tableName;
+ this.flushCommits = that.flushCommits;
+ this.writeToWAL = that.writeToWAL;
+ this.useTags = that.useTags;
+ this.noOfTags = that.noOfTags;
+ this.reportLatency = that.reportLatency;
+ this.multiGet = that.multiGet;
+ this.inMemoryCF = that.inMemoryCF;
+ this.presplitRegions = that.presplitRegions;
+ this.compression = that.compression;
+ this.blockEncoding = that.blockEncoding;
+ }
+
+ public boolean nomapred = false;
+ public int startRow = 0;
+ public int perClientRunRows = ROWS_PER_GB;
+ public int numClientThreads = 1;
+ public int totalRows = ROWS_PER_GB;
+ public float sampleRate = 1.0f;
+ public String tableName = TABLE_NAME;
+ public boolean flushCommits = true;
+ public boolean writeToWAL = true;
+ public boolean useTags = false;
+ public int noOfTags = 1;
+ public boolean reportLatency = false;
+ public int multiGet = 0;
+ boolean inMemoryCF = false;
+ int presplitRegions = 0;
+ public Compression.Algorithm compression = Compression.Algorithm.NONE;
+ public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
}
/*
@@ -926,48 +524,26 @@ public class PerformanceEvaluation exten
static abstract class Test {
// Below is make it so when Tests are all running in the one
// jvm, that they each have a differently seeded Random.
- private static final Random randomSeed =
- new Random(System.currentTimeMillis());
+ private static final Random randomSeed = new Random(System.currentTimeMillis());
private static long nextRandomSeed() {
return randomSeed.nextLong();
}
protected final Random rand = new Random(nextRandomSeed());
+ protected final Configuration conf;
+ protected final TestOptions opts;
- protected final int startRow;
- protected final int perClientRunRows;
- protected final int totalRows;
- protected final float sampleRate;
private final Status status;
- protected TableName tableName;
- protected HTableInterface table;
- protected volatile Configuration conf;
- protected boolean flushCommits;
- protected boolean writeToWAL;
- protected boolean useTags;
- protected int noOfTags;
- protected boolean reportLatency;
protected HConnection connection;
+ protected HTableInterface table;
/**
* Note that all subclasses of this class must provide a public contructor
* that has the exact same list of arguments.
*/
Test(final Configuration conf, final TestOptions options, final Status status) {
- super();
- this.startRow = options.getStartRow();
- this.perClientRunRows = options.getPerClientRunRows();
- this.totalRows = options.getTotalRows();
- this.sampleRate = options.getSampleRate();
- this.status = status;
- this.tableName = options.getTableName();
- this.table = null;
this.conf = conf;
- this.flushCommits = options.isFlushCommits();
- this.writeToWAL = options.isWriteToWAL();
- this.useTags = options.isUseTags();
- this.noOfTags = options.getNumTags();
- this.reportLatency = options.isReportLatency();
- this.connection = options.getConnection();
+ this.opts = options;
+ this.status = status;
}
private String generateStatus(final int sr, final int i, final int lr) {
@@ -975,20 +551,22 @@ public class PerformanceEvaluation exten
}
protected int getReportingPeriod() {
- int period = this.perClientRunRows / 10;
- return period == 0 ? this.perClientRunRows : period;
+ int period = opts.perClientRunRows / 10;
+ return period == 0 ? opts.perClientRunRows : period;
}
void testSetup() throws IOException {
- this.table = connection.getTable(tableName);
+ this.connection = HConnectionManager.createConnection(conf);
+ this.table = connection.getTable(opts.tableName);
this.table.setAutoFlush(false, true);
}
void testTakedown() throws IOException {
- if (flushCommits) {
+ if (opts.flushCommits) {
this.table.flushCommits();
}
table.close();
+ connection.close();
}
/*
@@ -1012,12 +590,12 @@ public class PerformanceEvaluation exten
* Provides an extension point for tests that don't want a per row invocation.
*/
void testTimed() throws IOException {
- int lastRow = this.startRow + this.perClientRunRows;
+ int lastRow = opts.startRow + opts.perClientRunRows;
// Report on completion of 1/10th of total.
- for (int i = this.startRow; i < lastRow; i++) {
+ for (int i = opts.startRow; i < lastRow; i++) {
testRow(i);
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
- status.setStatus(generateStatus(this.startRow, i, lastRow));
+ status.setStatus(generateStatus(opts.startRow, i, lastRow));
}
}
}
@@ -1038,7 +616,7 @@ public class PerformanceEvaluation exten
@Override
void testRow(final int i) throws IOException {
- Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
+ Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
ResultScanner s = this.table.getScanner(scan);
@@ -1048,8 +626,8 @@ public class PerformanceEvaluation exten
@Override
protected int getReportingPeriod() {
- int period = this.perClientRunRows / 100;
- return period == 0 ? this.perClientRunRows : period;
+ int period = opts.perClientRunRows / 100;
+ return period == 0 ? opts.perClientRunRows : period;
}
}
@@ -1083,15 +661,15 @@ public class PerformanceEvaluation exten
protected abstract Pair<byte[],byte[]> getStartAndStopRow();
protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
- int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
+ int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
int stop = start + maxRange;
return new Pair<byte[],byte[]>(format(start), format(stop));
}
@Override
protected int getReportingPeriod() {
- int period = this.perClientRunRows / 100;
- return period == 0? this.perClientRunRows: period;
+ int period = opts.perClientRunRows / 100;
+ return period == 0? opts.perClientRunRows: period;
}
}
@@ -1141,24 +719,20 @@ public class PerformanceEvaluation exten
static class RandomReadTest extends Test {
private final int everyN;
- private final boolean reportLatency;
private final double[] times;
- private final int multiGet;
private ArrayList<Get> gets;
int idx = 0;
RandomReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
- everyN = (int) (this.totalRows / (this.totalRows * this.sampleRate));
- LOG.info("Sampling 1 every " + everyN + " out of " + perClientRunRows + " total rows.");
- this.reportLatency = options.isReportLatency();
- this.multiGet = options.getMultiGet();
- if (this.multiGet > 0) {
- LOG.info("MultiGet enabled. Sending GETs in batches of " + this.multiGet + ".");
- this.gets = new ArrayList<Get>(this.multiGet);
+ everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
+ LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
+ if (opts.multiGet > 0) {
+ LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
+ this.gets = new ArrayList<Get>(opts.multiGet);
}
- if (this.reportLatency) {
- this.times = new double[(int) Math.ceil(this.perClientRunRows * this.sampleRate / Math.max(1, this.multiGet))];
+ if (opts.reportLatency) {
+ this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))];
} else {
this.times = null;
}
@@ -1167,14 +741,14 @@ public class PerformanceEvaluation exten
@Override
void testRow(final int i) throws IOException {
if (i % everyN == 0) {
- Get get = new Get(getRandomRow(this.rand, this.totalRows));
+ Get get = new Get(getRandomRow(this.rand, opts.totalRows));
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
- if (this.multiGet > 0) {
+ if (opts.multiGet > 0) {
this.gets.add(get);
- if (this.gets.size() == this.multiGet) {
+ if (this.gets.size() == opts.multiGet) {
long start = System.nanoTime();
this.table.get(this.gets);
- if (this.reportLatency) {
+ if (opts.reportLatency) {
times[idx++] = (System.nanoTime() - start) / 1e6;
}
this.gets.clear();
@@ -1182,7 +756,7 @@ public class PerformanceEvaluation exten
} else {
long start = System.nanoTime();
this.table.get(get);
- if (this.reportLatency) {
+ if (opts.reportLatency) {
times[idx++] = (System.nanoTime() - start) / 1e6;
}
}
@@ -1191,8 +765,8 @@ public class PerformanceEvaluation exten
@Override
protected int getReportingPeriod() {
- int period = this.perClientRunRows / 100;
- return period == 0 ? this.perClientRunRows : period;
+ int period = opts.perClientRunRows / 100;
+ return period == 0 ? opts.perClientRunRows : period;
}
@Override
@@ -1202,7 +776,7 @@ public class PerformanceEvaluation exten
this.gets.clear();
}
super.testTakedown();
- if (this.reportLatency) {
+ if (opts.reportLatency) {
Arrays.sort(times);
DescriptiveStatistics ds = new DescriptiveStatistics();
for (double t : times) {
@@ -1230,13 +804,13 @@ public class PerformanceEvaluation exten
@Override
void testRow(final int i) throws IOException {
- byte[] row = getRandomRow(this.rand, this.totalRows);
+ byte[] row = getRandomRow(this.rand, opts.totalRows);
Put put = new Put(row);
byte[] value = generateData(this.rand, VALUE_LENGTH);
- if (useTags) {
+ if (opts.useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
- Tag[] tags = new Tag[noOfTags];
- for (int n = 0; n < noOfTags; n++) {
+ Tag[] tags = new Tag[opts.noOfTags];
+ for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
@@ -1246,7 +820,7 @@ public class PerformanceEvaluation exten
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
- put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+ put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
}
}
@@ -1271,7 +845,7 @@ public class PerformanceEvaluation exten
@Override
void testRow(final int i) throws IOException {
if (this.testScanner == null) {
- Scan scan = new Scan(format(this.startRow));
+ Scan scan = new Scan(format(opts.startRow));
scan.setCaching(30);
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
this.testScanner = table.getScanner(scan);
@@ -1304,10 +878,10 @@ public class PerformanceEvaluation exten
byte[] row = format(i);
Put put = new Put(row);
byte[] value = generateData(this.rand, VALUE_LENGTH);
- if (useTags) {
+ if (opts.useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
- Tag[] tags = new Tag[noOfTags];
- for (int n = 0; n < noOfTags; n++) {
+ Tag[] tags = new Tag[opts.noOfTags];
+ for (int n = 0; n < opts.noOfTags; n++) {
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
@@ -1317,7 +891,7 @@ public class PerformanceEvaluation exten
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
- put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+ put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
table.put(put);
}
}
@@ -1420,26 +994,21 @@ public class PerformanceEvaluation exten
return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
}
- long runOneClient(final Class<? extends Test> cmd, final int startRow,
- final int perClientRunRows, final int totalRows, final float sampleRate,
- boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
- boolean reportLatency, int multiGet, HConnection connection, final Status status)
- throws IOException {
- status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
- perClientRunRows + " rows");
+ static long runOneClient(final Class<? extends Test> cmd, Configuration conf, TestOptions opts,
+ final Status status)
+ throws IOException {
+ status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
+ opts.perClientRunRows + " rows");
long totalElapsedTime = 0;
- TestOptions options = new TestOptions(startRow, perClientRunRows,
- totalRows, sampleRate, N, tableName, flushCommits, writeToWAL, useTags, noOfTags,
- reportLatency, multiGet, connection);
final Test t;
try {
- Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
- Configuration.class, TestOptions.class, Status.class);
- t = constructor.newInstance(getConf(), options, status);
+ Constructor<? extends Test> constructor =
+ cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
+ t = constructor.newInstance(conf, opts, status);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Invalid command class: " +
- cmd.getName() + ". It does not provide a constructor as described by" +
+ cmd.getName() + ". It does not provide a constructor as described by " +
"the javadoc comment. Available constructors are: " +
Arrays.toString(cmd.getConstructors()));
} catch (Exception e) {
@@ -1448,41 +1017,24 @@ public class PerformanceEvaluation exten
totalElapsedTime = t.test();
status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
- "ms at offset " + startRow + " for " + perClientRunRows + " rows" +
- " (" + calculateMbps((int)(perClientRunRows * sampleRate), totalElapsedTime) + ")");
+ "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
+ " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")");
return totalElapsedTime;
}
- private void runNIsOne(final Class<? extends Test> cmd) throws IOException {
- Status status = new Status() {
- public void setStatus(String msg) throws IOException {
- LOG.info(msg);
- }
- };
-
+ private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
+ InterruptedException, ClassNotFoundException {
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(getConf());
- checkTable(admin);
- runOneClient(cmd, 0, this.R, this.R, this.sampleRate, this.flushCommits,
- this.writeToWAL, this.useTags, this.noOfTags, this.reportLatency, this.multiGet,
- this.connection, status);
- } catch (Exception e) {
- LOG.error("Failed", e);
+ checkTable(admin, opts);
} finally {
if (admin != null) admin.close();
}
- }
-
- private void runTest(final Class<? extends Test> cmd) throws IOException,
- InterruptedException, ClassNotFoundException {
- if (N == 1) {
- // If there is only one client and one HRegionServer, we assume nothing
- // has been set up at all.
- runNIsOne(cmd);
+ if (opts.nomapred) {
+ doLocalClients(cmd, opts);
} else {
- // Else, run
- runNIsMoreThanOne(cmd);
+ doMapReduce(cmd, opts);
}
}
@@ -1542,16 +1094,15 @@ public class PerformanceEvaluation exten
+ " sequentialWrite 1");
}
- private void getArgs(final int start, final String[] args) {
+ private static int getNumClients(final int start, final String[] args) {
if(start + 1 > args.length) {
throw new IllegalArgumentException("must supply the number of clients");
}
- N = Integer.parseInt(args[start]);
+ int N = Integer.parseInt(args[start]);
if (N < 1) {
throw new IllegalArgumentException("Number of clients must be > 1");
}
- // Set total number of rows to write.
- this.R = this.R * N;
+ return N;
}
public int run(String[] args) throws Exception {
@@ -1569,7 +1120,9 @@ public class PerformanceEvaluation exten
// input, take a look at writeInputFile().
// Then you must adapt the LINE_PATTERN input regex,
// and parse the argument, take a look at PEInputFormat.getSplits().
-
+
+ TestOptions opts = new TestOptions();
+
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
if (cmd.equals("-h") || cmd.startsWith("--h")) {
@@ -1580,94 +1133,94 @@ public class PerformanceEvaluation exten
final String nmr = "--nomapred";
if (cmd.startsWith(nmr)) {
- this.nomapred = true;
+ opts.nomapred = true;
continue;
}
final String rows = "--rows=";
if (cmd.startsWith(rows)) {
- this.R = Integer.parseInt(cmd.substring(rows.length()));
+ opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
continue;
}
final String sampleRate = "--sampleRate=";
if (cmd.startsWith(sampleRate)) {
- this.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+ opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
continue;
}
final String table = "--table=";
if (cmd.startsWith(table)) {
- this.tableName = TableName.valueOf(cmd.substring(table.length()));
+ opts.tableName = cmd.substring(table.length());
continue;
}
final String compress = "--compress=";
if (cmd.startsWith(compress)) {
- this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+ opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
continue;
}
final String blockEncoding = "--blockEncoding=";
if (cmd.startsWith(blockEncoding)) {
- this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+ opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
continue;
}
final String flushCommits = "--flushCommits=";
if (cmd.startsWith(flushCommits)) {
- this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+ opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
continue;
}
final String writeToWAL = "--writeToWAL=";
if (cmd.startsWith(writeToWAL)) {
- this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+ opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
continue;
}
final String presplit = "--presplit=";
if (cmd.startsWith(presplit)) {
- this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+ opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
continue;
}
final String inMemory = "--inmemory=";
if (cmd.startsWith(inMemory)) {
- this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+ opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
continue;
}
final String latency = "--latency";
if (cmd.startsWith(latency)) {
- this.reportLatency = true;
+ opts.reportLatency = true;
continue;
}
final String multiGet = "--multiGet=";
if (cmd.startsWith(multiGet)) {
- this.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+ opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
continue;
}
- this.connection = HConnectionManager.createConnection(getConf());
-
final String useTags = "--usetags=";
if (cmd.startsWith(useTags)) {
- this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+ opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
continue;
}
final String noOfTags = "--nooftags=";
if (cmd.startsWith(noOfTags)) {
- this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+ opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
continue;
}
Class<? extends Test> cmdClass = determineCommandClass(cmd);
if (cmdClass != null) {
- getArgs(i + 1, args);
- runTest(cmdClass);
+ opts.numClientThreads = getNumClients(i + 1, args);
+ // number of rows specified
+ opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
+ runTest(cmdClass, opts);
errCode = 0;
break;
}