You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/10 09:55:24 UTC
[03/28] hbase git commit: HBASE-13118 [PE] Add being able to write
many columns
HBASE-13118 [PE] Add being able to write many columns
Conflicts:
hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66f7bf46
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66f7bf46
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66f7bf46
Branch: refs/heads/hbase-12439
Commit: 66f7bf461532423d92be4e57a26edc2e13a7dbad
Parents: ed70376
Author: stack <st...@apache.org>
Authored: Thu Apr 9 16:49:16 2015 -0700
Committer: stack <st...@apache.org>
Committed: Thu Apr 9 16:52:11 2015 -0700
----------------------------------------------------------------------
.../hadoop/hbase/PerformanceEvaluation.java | 152 +++++++++++++------
.../hbase/mapreduce/TestHFileOutputFormat.java | 9 +-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 9 +-
3 files changed, 115 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/66f7bf46/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index d51f698..eb2f468 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -124,7 +124,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
public static final String TABLE_NAME = "TestTable";
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
- public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
+ public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
+ public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
public static final int DEFAULT_VALUE_LENGTH = 1000;
public static final int ROW_LENGTH = 26;
@@ -610,6 +611,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
int valueSize = DEFAULT_VALUE_LENGTH;
int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
int cycles = 1;
+ int columns = 1;
+ int caching = 30;
boolean addColumns = true;
public TestOptions() {}
@@ -653,6 +656,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
this.randomSleep = that.randomSleep;
this.measureAfter = that.measureAfter;
this.addColumns = that.addColumns;
+ this.columns = that.columns;
+ this.caching = that.caching;
+ }
+
+ public int getCaching() {
+ return this.caching;
+ }
+
+ public void setCaching(final int caching) {
+ this.caching = caching;
+ }
+
+ public int getColumns() {
+ return this.columns;
+ }
+
+ public void setColumns(final int columns) {
+ this.columns = columns;
}
public int getCycles() {
@@ -1157,7 +1178,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
void onStartup() throws IOException {
this.table = connection.getTable(TableName.valueOf(opts.tableName));
}
-
+
@Override
void onTakedown() throws IOException {
table.close();
@@ -1175,7 +1196,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
void onStartup() throws IOException {
this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
}
-
+
@Override
void onTakedown() throws IOException {
mutator.close();
@@ -1190,9 +1211,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
@Override
void testRow(final int i) throws IOException {
Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
+ scan.setCaching(opts.caching);
FilterList list = new FilterList();
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
if (opts.filterAll) {
list.addFilter(new FilterAllFilter());
@@ -1223,11 +1247,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
+ scan.setCaching(opts.caching);
if (opts.filterAll) {
scan.setFilter(new FilterAllFilter());
}
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
Result r = null;
int count = 0;
@@ -1326,6 +1353,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
Get get = new Get(getRandomRow(this.rand, opts.totalRows));
if (opts.addColumns) {
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ get.addFamily(FAMILY_NAME);
}
if (opts.filterAll) {
get.setFilter(new FilterAllFilter());
@@ -1369,21 +1398,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException {
byte[] row = getRandomRow(this.rand, opts.totalRows);
Put put = new Put(row);
- byte[] value = generateData(this.rand, getValueLength(this.rand));
- if (opts.useTags) {
- byte[] tag = generateData(this.rand, TAG_LENGTH);
- Tag[] tags = new Tag[opts.noOfTags];
- for (int n = 0; n < opts.noOfTags; n++) {
- Tag t = new Tag((byte) n, tag);
- tags[n] = t;
+ for (int column = 0; column < opts.columns; column++) {
+ byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+ byte[] value = generateData(this.rand, getValueLength(this.rand));
+ if (opts.useTags) {
+ byte[] tag = generateData(this.rand, TAG_LENGTH);
+ Tag[] tags = new Tag[opts.noOfTags];
+ for (int n = 0; n < opts.noOfTags; n++) {
+ Tag t = new Tag((byte) n, tag);
+ tags[n] = t;
+ }
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
+ updateValueSize(kv.getValueLength());
+ } else {
+ put.add(FAMILY_NAME, qualifier, value);
+ updateValueSize(value.length);
}
- KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
- value, tags);
- put.add(kv);
- updateValueSize(kv.getValueLength());
- } else {
- put.add(FAMILY_NAME, QUALIFIER_NAME, value);
- updateValueSize(value.length);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
mutator.mutate(put);
@@ -1410,9 +1442,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException {
if (this.testScanner == null) {
Scan scan = new Scan(format(opts.startRow));
- scan.setCaching(30);
+ scan.setCaching(opts.caching);
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
if (opts.filterAll) {
scan.setFilter(new FilterAllFilter());
@@ -1452,21 +1486,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
void testRow(final int i) throws IOException {
byte[] row = format(i);
Put put = new Put(row);
- byte[] value = generateData(this.rand, getValueLength(this.rand));
- if (opts.useTags) {
- byte[] tag = generateData(this.rand, TAG_LENGTH);
- Tag[] tags = new Tag[opts.noOfTags];
- for (int n = 0; n < opts.noOfTags; n++) {
- Tag t = new Tag((byte) n, tag);
- tags[n] = t;
+ for (int column = 0; column < opts.columns; column++) {
+ byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+ byte[] value = generateData(this.rand, getValueLength(this.rand));
+ if (opts.useTags) {
+ byte[] tag = generateData(this.rand, TAG_LENGTH);
+ Tag[] tags = new Tag[opts.noOfTags];
+ for (int n = 0; n < opts.noOfTags; n++) {
+ Tag t = new Tag((byte) n, tag);
+ tags[n] = t;
+ }
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
+ updateValueSize(kv.getValueLength());
+ } else {
+ put.add(FAMILY_NAME, qualifier, value);
+ updateValueSize(value.length);
}
- KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
- value, tags);
- put.add(kv);
- updateValueSize(kv.getValueLength());
- } else {
- put.add(FAMILY_NAME, QUALIFIER_NAME, value);
- updateValueSize(value.length);
}
put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
mutator.mutate(put);
@@ -1498,7 +1535,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
protected Scan constructScan(byte[] valuePrefix) throws IOException {
FilterList list = new FilterList();
Filter filter = new SingleColumnValueFilter(
- FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
+ FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
new BinaryComparator(valuePrefix)
);
list.addFilter(filter);
@@ -1506,8 +1543,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
list.addFilter(new FilterAllFilter());
}
Scan scan = new Scan();
+ scan.setCaching(opts.caching);
if (opts.addColumns) {
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+ } else {
+ scan.addFamily(FAMILY_NAME);
}
scan.setFilter(list);
return scan;
@@ -1520,11 +1560,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
* @param timeMs Time taken in milliseconds.
* @return String value with label, ie '123.76 MB/s'
*/
- private static String calculateMbps(int rows, long timeMs, final int valueSize) {
- // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
- // * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
- BigDecimal rowSize =
- BigDecimal.valueOf(ROW_LENGTH + valueSize + FAMILY_NAME.length + QUALIFIER_NAME.length);
+ private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
+ BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
+ ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
.divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
.divide(BYTES_PER_MB, CXT);
@@ -1613,7 +1651,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
"ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
- getAverageValueLength(opts)) + ")");
+ getAverageValueLength(opts), opts.columns) + ")");
return new RunResult(totalElapsedTime, t.getLatency());
}
@@ -1645,14 +1683,23 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
protected void printUsage() {
- printUsage(null);
+ printUsage(this.getClass().getName(), null);
+ }
+
+ protected static void printUsage(final String message) {
+ printUsage(PerformanceEvaluation.class.getName(), message);
+ }
+
+ protected static void printUsageAndExit(final String message, final int exitCode) {
+ printUsage(message);
+ System.exit(exitCode);
}
- protected void printUsage(final String message) {
+ protected static void printUsage(final String className, final String message) {
if (message != null && message.length() > 0) {
System.err.println(message);
}
- System.err.println("Usage: java " + this.getClass().getName() + " \\");
+ System.err.println("Usage: java " + className + " \\");
System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
System.err.println();
System.err.println("Options:");
@@ -1704,6 +1751,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" cycles How many times to cycle the test. Defaults: 1.");
System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
+ System.err.println(" columns Columns to write per row. Default: 1");
+ System.err.println(" caching Scan caching to use. Default: 30");
System.err.println();
System.err.println(" Note: -D properties will be applied to the conf used. ");
System.err.println(" For example: ");
@@ -1721,8 +1770,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
System.err.println(" running: 1 <= value <= 500");
System.err.println("Examples:");
System.err.println(" To run a single evaluation client:");
- System.err.println(" $ bin/hbase " + this.getClass().getName()
- + " sequentialWrite 1");
+ System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
}
/**
@@ -1935,6 +1983,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
continue;
}
+ final String columns = "--columns=";
+ if (cmd.startsWith(columns)) {
+ opts.columns = Integer.parseInt(cmd.substring(columns.length()));
+ continue;
+ }
+
+ final String caching = "--caching=";
+ if (cmd.startsWith(caching)) {
+ opts.caching = Integer.parseInt(cmd.substring(caching.length()));
+ continue;
+ }
+
if (isCommandClass(cmd)) {
opts.cmdName = cmd;
opts.numClientThreads = Integer.parseInt(args.remove());
@@ -1945,6 +2005,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
opts = calculateRowsAndSize(opts);
break;
+ } else {
+ printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
}
// Not matching any option or command.
@@ -1970,7 +2032,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
}
static int getRowsPerGB(final TestOptions opts) {
- return ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
+ return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/66f7bf46/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index ecea98e..438266e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -125,6 +125,7 @@ public class TestHFileOutputFormat {
private int valLength;
private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length";
+ private static final byte [] QUALIFIER = Bytes.toBytes("data");
@Override
protected void setup(Context context) throws IOException,
@@ -159,8 +160,7 @@ public class TestHFileOutputFormat {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat.FAMILIES) {
- KeyValue kv = new KeyValue(keyBytes, family,
- PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
context.write(key, kv);
}
}
@@ -878,7 +878,7 @@ public class TestHFileOutputFormat {
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
+ final byte [] qualifier = Bytes.toBytes("data");
Random random = new Random();
for (int i = 0; i < numRows; i++) {
@@ -887,8 +887,7 @@ public class TestHFileOutputFormat {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : families) {
- KeyValue kv = new KeyValue(keyBytes, family,
- PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
writer.write(key, kv);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/66f7bf46/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 0f60f3b..67a6c0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -124,6 +124,7 @@ public class TestHFileOutputFormat2 {
private int valLength;
private static final int VALLEN_DEFAULT=10;
private static final String VALLEN_CONF="randomkv.val.length";
+ private static final byte [] QUALIFIER = Bytes.toBytes("data");
@Override
protected void setup(Context context) throws IOException,
@@ -159,8 +160,7 @@ public class TestHFileOutputFormat2 {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
- Cell kv = new KeyValue(keyBytes, family,
- PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
context.write(key, kv);
}
}
@@ -879,7 +879,7 @@ public class TestHFileOutputFormat2 {
int taskId = context.getTaskAttemptID().getTaskID().getId();
assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
+ final byte [] qualifier = Bytes.toBytes("data");
Random random = new Random();
for (int i = 0; i < numRows; i++) {
@@ -888,8 +888,7 @@ public class TestHFileOutputFormat2 {
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
for (byte[] family : families) {
- Cell kv = new KeyValue(keyBytes, family,
- PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
writer.write(key, kv);
}
}