You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/10 09:55:24 UTC

[03/28] hbase git commit: HBASE-13118 [PE] Add being able to write many columns

HBASE-13118 [PE] Add being able to write many columns

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66f7bf46
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66f7bf46
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66f7bf46

Branch: refs/heads/hbase-12439
Commit: 66f7bf461532423d92be4e57a26edc2e13a7dbad
Parents: ed70376
Author: stack <st...@apache.org>
Authored: Thu Apr 9 16:49:16 2015 -0700
Committer: stack <st...@apache.org>
Committed: Thu Apr 9 16:52:11 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/PerformanceEvaluation.java     | 152 +++++++++++++------
 .../hbase/mapreduce/TestHFileOutputFormat.java  |   9 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java |   9 +-
 3 files changed, 115 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66f7bf46/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index d51f698..eb2f468 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -124,7 +124,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
 
   public static final String TABLE_NAME = "TestTable";
   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
-  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
+  public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
+  public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
   public static final int DEFAULT_VALUE_LENGTH = 1000;
   public static final int ROW_LENGTH = 26;
 
@@ -610,6 +611,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
     int valueSize = DEFAULT_VALUE_LENGTH;
     int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
     int cycles = 1;
+    int columns = 1;
+    int caching = 30;
     boolean addColumns = true;
 
     public TestOptions() {}
@@ -653,6 +656,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
       this.randomSleep = that.randomSleep;
       this.measureAfter = that.measureAfter;
       this.addColumns = that.addColumns;
+      this.columns = that.columns;
+      this.caching = that.caching;
+    }
+
+    public int getCaching() {
+      return this.caching;
+    }
+
+    public void setCaching(final int caching) {
+      this.caching = caching;
+    }
+
+    public int getColumns() {
+      return this.columns;
+    }
+
+    public void setColumns(final int columns) {
+      this.columns = columns;
     }
 
     public int getCycles() {
@@ -1157,7 +1178,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void onStartup() throws IOException {
       this.table = connection.getTable(TableName.valueOf(opts.tableName));
     }
-    
+
     @Override
     void onTakedown() throws IOException {
       table.close();
@@ -1175,7 +1196,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void onStartup() throws IOException {
       this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
     }
-    
+
     @Override
     void onTakedown() throws IOException {
       mutator.close();
@@ -1190,9 +1211,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
     @Override
     void testRow(final int i) throws IOException {
       Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
+      scan.setCaching(opts.caching);
       FilterList list = new FilterList();
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       if (opts.filterAll) {
         list.addFilter(new FilterAllFilter());
@@ -1223,11 +1247,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
+      scan.setCaching(opts.caching);
       if (opts.filterAll) {
         scan.setFilter(new FilterAllFilter());
       }
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       Result r = null;
       int count = 0;
@@ -1326,6 +1353,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
       Get get = new Get(getRandomRow(this.rand, opts.totalRows));
       if (opts.addColumns) {
         get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        get.addFamily(FAMILY_NAME);
       }
       if (opts.filterAll) {
         get.setFilter(new FilterAllFilter());
@@ -1369,21 +1398,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       byte[] row = getRandomRow(this.rand, opts.totalRows);
       Put put = new Put(row);
-      byte[] value = generateData(this.rand, getValueLength(this.rand));
-      if (opts.useTags) {
-        byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[opts.noOfTags];
-        for (int n = 0; n < opts.noOfTags; n++) {
-          Tag t = new Tag((byte) n, tag);
-          tags[n] = t;
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new Tag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.add(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
         }
-        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
-            value, tags);
-        put.add(kv);
-        updateValueSize(kv.getValueLength());
-      } else {
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
-        updateValueSize(value.length);
       }
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       mutator.mutate(put);
@@ -1410,9 +1442,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       if (this.testScanner == null) {
         Scan scan = new Scan(format(opts.startRow));
-        scan.setCaching(30);
+        scan.setCaching(opts.caching);
         if (opts.addColumns) {
           scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+        } else {
+          scan.addFamily(FAMILY_NAME);
         }
         if (opts.filterAll) {
           scan.setFilter(new FilterAllFilter());
@@ -1452,21 +1486,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       byte[] row = format(i);
       Put put = new Put(row);
-      byte[] value = generateData(this.rand, getValueLength(this.rand));
-      if (opts.useTags) {
-        byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[opts.noOfTags];
-        for (int n = 0; n < opts.noOfTags; n++) {
-          Tag t = new Tag((byte) n, tag);
-          tags[n] = t;
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new Tag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.add(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
         }
-        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
-            value, tags);
-        put.add(kv);
-        updateValueSize(kv.getValueLength());
-      } else {
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
-        updateValueSize(value.length);
       }
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       mutator.mutate(put);
@@ -1498,7 +1535,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     protected Scan constructScan(byte[] valuePrefix) throws IOException {
       FilterList list = new FilterList();
       Filter filter = new SingleColumnValueFilter(
-          FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
+          FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
           new BinaryComparator(valuePrefix)
       );
       list.addFilter(filter);
@@ -1506,8 +1543,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
         list.addFilter(new FilterAllFilter());
       }
       Scan scan = new Scan();
+      scan.setCaching(opts.caching);
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       scan.setFilter(list);
       return scan;
@@ -1520,11 +1560,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @param timeMs Time taken in milliseconds.
    * @return String value with label, ie '123.76 MB/s'
    */
-  private static String calculateMbps(int rows, long timeMs, final int valueSize) {
-    // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
-    //        * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
-    BigDecimal rowSize =
-      BigDecimal.valueOf(ROW_LENGTH + valueSize + FAMILY_NAME.length + QUALIFIER_NAME.length);
+  private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
+    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
+      ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
     BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
       .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
       .divide(BYTES_PER_MB, CXT);
@@ -1613,7 +1651,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
       "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
       " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
-          getAverageValueLength(opts)) + ")");
+          getAverageValueLength(opts), opts.columns) + ")");
 
     return new RunResult(totalElapsedTime, t.getLatency());
   }
@@ -1645,14 +1683,23 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   protected void printUsage() {
-    printUsage(null);
+    printUsage(this.getClass().getName(), null);
+  }
+
+  protected static void printUsage(final String message) {
+    printUsage(PerformanceEvaluation.class.getName(), message);
+  }
+
+  protected static void printUsageAndExit(final String message, final int exitCode) {
+    printUsage(message);
+    System.exit(exitCode);
   }
 
-  protected void printUsage(final String message) {
+  protected static void printUsage(final String className, final String message) {
     if (message != null && message.length() > 0) {
       System.err.println(message);
     }
-    System.err.println("Usage: java " + this.getClass().getName() + " \\");
+    System.err.println("Usage: java " + className + " \\");
     System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
     System.err.println();
     System.err.println("Options:");
@@ -1704,6 +1751,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
     System.err.println(" cycles          How many times to cycle the test. Defaults: 1.");
     System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
     System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
+    System.err.println(" columns         Columns to write per row. Default: 1");
+    System.err.println(" caching         Scan caching to use. Default: 30");
     System.err.println();
     System.err.println(" Note: -D properties will be applied to the conf used. ");
     System.err.println("  For example: ");
@@ -1721,8 +1770,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     System.err.println("                 running: 1 <= value <= 500");
     System.err.println("Examples:");
     System.err.println(" To run a single evaluation client:");
-    System.err.println(" $ bin/hbase " + this.getClass().getName()
-        + " sequentialWrite 1");
+    System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
   }
 
   /**
@@ -1935,6 +1983,18 @@ public class PerformanceEvaluation extends Configured implements Tool {
         continue;
       }
 
+      final String columns = "--columns=";
+      if (cmd.startsWith(columns)) {
+        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
+        continue;
+      }
+
+      final String caching = "--caching=";
+      if (cmd.startsWith(caching)) {
+        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
+        continue;
+      }
+
       if (isCommandClass(cmd)) {
         opts.cmdName = cmd;
         opts.numClientThreads = Integer.parseInt(args.remove());
@@ -1945,6 +2005,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
         }
         opts = calculateRowsAndSize(opts);
         break;
+      } else {
+        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
       }
 
       // Not matching any option or command.
@@ -1970,7 +2032,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   static int getRowsPerGB(final TestOptions opts) {
-    return ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
+    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/66f7bf46/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index ecea98e..438266e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -125,6 +125,7 @@ public class TestHFileOutputFormat  {
     private int valLength;
     private static final int VALLEN_DEFAULT=10;
     private static final String VALLEN_CONF="randomkv.val.length";
+    private static final byte [] QUALIFIER = Bytes.toBytes("data");
 
     @Override
     protected void setup(Context context) throws IOException,
@@ -159,8 +160,7 @@ public class TestHFileOutputFormat  {
         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
         for (byte[] family : TestHFileOutputFormat.FAMILIES) {
-          KeyValue kv = new KeyValue(keyBytes, family,
-              PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+          KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
           context.write(key, kv);
         }
       }
@@ -878,7 +878,7 @@ public class TestHFileOutputFormat  {
 
     int taskId = context.getTaskAttemptID().getTaskID().getId();
     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
+    final byte [] qualifier = Bytes.toBytes("data");
     Random random = new Random();
     for (int i = 0; i < numRows; i++) {
 
@@ -887,8 +887,7 @@ public class TestHFileOutputFormat  {
       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
       for (byte[] family : families) {
-        KeyValue kv = new KeyValue(keyBytes, family,
-            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
         writer.write(key, kv);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66f7bf46/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 0f60f3b..67a6c0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -124,6 +124,7 @@ public class TestHFileOutputFormat2  {
     private int valLength;
     private static final int VALLEN_DEFAULT=10;
     private static final String VALLEN_CONF="randomkv.val.length";
+    private static final byte [] QUALIFIER = Bytes.toBytes("data");
 
     @Override
     protected void setup(Context context) throws IOException,
@@ -159,8 +160,7 @@ public class TestHFileOutputFormat2  {
         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
         for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
-          Cell kv = new KeyValue(keyBytes, family,
-              PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+          Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
           context.write(key, kv);
         }
       }
@@ -879,7 +879,7 @@ public class TestHFileOutputFormat2  {
 
     int taskId = context.getTaskAttemptID().getTaskID().getId();
     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
+    final byte [] qualifier = Bytes.toBytes("data");
     Random random = new Random();
     for (int i = 0; i < numRows; i++) {
 
@@ -888,8 +888,7 @@ public class TestHFileOutputFormat2  {
       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
       for (byte[] family : families) {
-        Cell kv = new KeyValue(keyBytes, family,
-            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
         writer.write(key, kv);
       }
     }