You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/04/10 01:49:43 UTC

hbase git commit: HBASE-13118 [PE] Add being able to write many columns

Repository: hbase
Updated Branches:
  refs/heads/branch-1.0 bbb5a28dc -> a5d8f6256


HBASE-13118 [PE] Add being able to write many columns


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a5d8f625
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a5d8f625
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a5d8f625

Branch: refs/heads/branch-1.0
Commit: a5d8f62565e7a38e0d7ee018ef7b6c4a8fca53d3
Parents: bbb5a28
Author: stack <st...@apache.org>
Authored: Thu Apr 9 16:49:16 2015 -0700
Committer: stack <st...@apache.org>
Committed: Thu Apr 9 16:49:34 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/PerformanceEvaluation.java     | 156 +++++++++++++------
 .../hbase/mapreduce/TestHFileOutputFormat.java  |   9 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java |   9 +-
 3 files changed, 119 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a5d8f625/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index 755372c..17f00ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -126,7 +126,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
 
   public static final String TABLE_NAME = "TestTable";
   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
-  public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
+  public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
+  public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
   public static final int DEFAULT_VALUE_LENGTH = 1000;
   public static final int ROW_LENGTH = 26;
 
@@ -597,6 +598,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
     boolean valueZipf = false;
     int valueSize = DEFAULT_VALUE_LENGTH;
     int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
+    int columns = 1;
+    int caching = 30;
     boolean addColumns = true;
 
     public TestOptions() {}
@@ -638,6 +641,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
       this.period = that.period;
       this.randomSleep = that.randomSleep;
       this.addColumns = that.addColumns;
+      this.columns = that.columns;
+      this.caching = that.caching;
+    }
+
+    public int getCaching() {
+      return this.caching;
+    }
+
+    public void setCaching(final int caching) {
+      this.caching = caching;
+    }
+
+    public int getColumns() {
+      return this.columns;
+    }
+
+    public void setColumns(final int columns) {
+      this.columns = columns;
     }
 
     public boolean isValueZipf() {
@@ -1120,7 +1141,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void onStartup() throws IOException {
       this.table = connection.getTable(TableName.valueOf(opts.tableName));
     }
-    
+
     @Override
     void onTakedown() throws IOException {
       table.close();
@@ -1138,7 +1159,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void onStartup() throws IOException {
       this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName));
     }
-    
+
     @Override
     void onTakedown() throws IOException {
       mutator.close();
@@ -1153,9 +1174,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
     @Override
     void testRow(final int i) throws IOException {
       Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
+      scan.setCaching(opts.caching);
       FilterList list = new FilterList();
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       if (opts.filterAll) {
         list.addFilter(new FilterAllFilter());
@@ -1186,11 +1210,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
+      scan.setCaching(opts.caching);
       if (opts.filterAll) {
         scan.setFilter(new FilterAllFilter());
       }
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       Result r = null;
       int count = 0;
@@ -1289,6 +1316,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
       Get get = new Get(getRandomRow(this.rand, opts.totalRows));
       if (opts.addColumns) {
         get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        get.addFamily(FAMILY_NAME);
       }
       if (opts.filterAll) {
         get.setFilter(new FilterAllFilter());
@@ -1332,21 +1361,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       byte[] row = getRandomRow(this.rand, opts.totalRows);
       Put put = new Put(row);
-      byte[] value = generateData(this.rand, getValueLength(this.rand));
-      if (opts.useTags) {
-        byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[opts.noOfTags];
-        for (int n = 0; n < opts.noOfTags; n++) {
-          Tag t = new Tag((byte) n, tag);
-          tags[n] = t;
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new Tag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.add(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
         }
-        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
-            value, tags);
-        put.add(kv);
-        updateValueSize(kv.getValueLength());
-      } else {
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
-        updateValueSize(value.length);
       }
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       mutator.mutate(put);
@@ -1373,9 +1405,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       if (this.testScanner == null) {
         Scan scan = new Scan(format(opts.startRow));
-        scan.setCaching(30);
+        scan.setCaching(opts.caching);
         if (opts.addColumns) {
           scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+        } else {
+          scan.addFamily(FAMILY_NAME);
         }
         if (opts.filterAll) {
           scan.setFilter(new FilterAllFilter());
@@ -1415,21 +1449,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       byte[] row = format(i);
       Put put = new Put(row);
-      byte[] value = generateData(this.rand, getValueLength(this.rand));
-      if (opts.useTags) {
-        byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[opts.noOfTags];
-        for (int n = 0; n < opts.noOfTags; n++) {
-          Tag t = new Tag((byte) n, tag);
-          tags[n] = t;
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new Tag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.add(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
         }
-        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
-            value, tags);
-        put.add(kv);
-        updateValueSize(kv.getValueLength());
-      } else {
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value);
-        updateValueSize(value.length);
       }
       put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       mutator.mutate(put);
@@ -1461,7 +1498,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     protected Scan constructScan(byte[] valuePrefix) throws IOException {
       FilterList list = new FilterList();
       Filter filter = new SingleColumnValueFilter(
-          FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
+          FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
           new BinaryComparator(valuePrefix)
       );
       list.addFilter(filter);
@@ -1469,8 +1506,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
         list.addFilter(new FilterAllFilter());
       }
       Scan scan = new Scan();
+      scan.setCaching(opts.caching);
       if (opts.addColumns) {
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
       }
       scan.setFilter(list);
       return scan;
@@ -1483,11 +1523,9 @@ public class PerformanceEvaluation extends Configured implements Tool {
    * @param timeMs Time taken in milliseconds.
    * @return String value with label, ie '123.76 MB/s'
    */
-  private static String calculateMbps(int rows, long timeMs, final int valueSize) {
-    // MB/s = ((totalRows * ROW_SIZE_BYTES) / totalTimeMS)
-    //        * 1000 MS_PER_SEC / (1024 * 1024) BYTES_PER_MB
-    BigDecimal rowSize =
-      BigDecimal.valueOf(ROW_LENGTH + valueSize + FAMILY_NAME.length + QUALIFIER_NAME.length);
+  private static String calculateMbps(int rows, long timeMs, final int valueSize, int columns) {
+    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
+      ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
     BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
       .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
       .divide(BYTES_PER_MB, CXT);
@@ -1576,7 +1614,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
       "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
       " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
-          getAverageValueLength(opts)) + ")");
+          getAverageValueLength(opts), opts.columns) + ")");
 
     return new RunResult(totalElapsedTime, t.getLatency());
   }
@@ -1602,14 +1640,23 @@ public class PerformanceEvaluation extends Configured implements Tool {
   }
 
   protected void printUsage() {
-    printUsage(null);
+    printUsage(this.getClass().getName(), null);
+  }
+
+  protected static void printUsage(final String message) {
+    printUsage(PerformanceEvaluation.class.getName(), message);
+  }
+
+  protected static void printUsageAndExit(final String message, final int exitCode) {
+    printUsage(message);
+    System.exit(exitCode);
   }
 
-  protected void printUsage(final String message) {
+  protected static void printUsage(final String className, final String message) {
     if (message != null && message.length() > 0) {
       System.err.println(message);
     }
-    System.err.println("Usage: java " + this.getClass().getName() + " \\");
+    System.err.println("Usage: java " + className + " \\");
     System.err.println("  <OPTIONS> [-D<property=value>]* <command> <nclients>");
     System.err.println();
     System.err.println("Options:");
@@ -1658,6 +1705,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
     System.err.println(" replicas        Enable region replica testing. Defaults: 1.");
     System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy for the table.");
     System.err.println(" randomSleep     Do a random sleep before each get between 0 and entered value. Defaults: 0");
+    System.err.println(" columns         Columns to write per row. Default: 1");
+    System.err.println(" caching         Scan caching to use. Default: 30");
     System.err.println();
     System.err.println(" Note: -D properties will be applied to the conf used. ");
     System.err.println("  For example: ");
@@ -1675,8 +1724,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     System.err.println("                 running: 1 <= value <= 500");
     System.err.println("Examples:");
     System.err.println(" To run a single evaluation client:");
-    System.err.println(" $ bin/hbase " + this.getClass().getName()
-        + " sequentialWrite 1");
+    System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
   }
 
   /**
@@ -1876,10 +1924,22 @@ public class PerformanceEvaluation extends Configured implements Tool {
         continue;
       }
 
+      final String columns = "--columns=";
+      if (cmd.startsWith(columns)) {
+        opts.columns = Integer.parseInt(cmd.substring(columns.length()));
+        continue;
+      }
+
+      final String caching = "--caching=";
+      if (cmd.startsWith(caching)) {
+        opts.caching = Integer.parseInt(cmd.substring(caching.length()));
+        continue;
+      }
+
       if (isCommandClass(cmd)) {
         opts.cmdName = cmd;
         opts.numClientThreads = Integer.parseInt(args.remove());
-        int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize);
+        int rowsPerGB = getRowsPerGB(opts);
         if (opts.size != DEFAULT_OPTS.size &&
             opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
           throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments.");
@@ -1894,6 +1954,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
           opts.size = opts.totalRows / rowsPerGB;
         }
         break;
+      } else {
+        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
       }
 
       // Not matching any option or command.
@@ -1904,6 +1966,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
     return opts;
   }
 
+  static int getRowsPerGB(final TestOptions opts) {
+    return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getColumns());
+  }
+
   @Override
   public int run(String[] args) throws Exception {
     // Process command-line args. TODO: Better cmd-line processing

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5d8f625/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index 8ed8464..c1ff2ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -123,6 +123,7 @@ public class TestHFileOutputFormat  {
     private int valLength;
     private static final int VALLEN_DEFAULT=10;
     private static final String VALLEN_CONF="randomkv.val.length";
+    private static final byte [] QUALIFIER = Bytes.toBytes("data");
 
     @Override
     protected void setup(Context context) throws IOException,
@@ -157,8 +158,7 @@ public class TestHFileOutputFormat  {
         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
         for (byte[] family : TestHFileOutputFormat.FAMILIES) {
-          KeyValue kv = new KeyValue(keyBytes, family,
-              PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+          KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
           context.write(key, kv);
         }
       }
@@ -862,7 +862,7 @@ public class TestHFileOutputFormat  {
 
     int taskId = context.getTaskAttemptID().getTaskID().getId();
     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
+    final byte [] qualifier = Bytes.toBytes("data");
     Random random = new Random();
     for (int i = 0; i < numRows; i++) {
 
@@ -871,8 +871,7 @@ public class TestHFileOutputFormat  {
       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
       for (byte[] family : families) {
-        KeyValue kv = new KeyValue(keyBytes, family,
-            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
         writer.write(key, kv);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a5d8f625/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 2a780d4..c806802 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -120,6 +120,7 @@ public class TestHFileOutputFormat2  {
     private int valLength;
     private static final int VALLEN_DEFAULT=10;
     private static final String VALLEN_CONF="randomkv.val.length";
+    private static final byte [] QUALIFIER = Bytes.toBytes("data");
 
     @Override
     protected void setup(Context context) throws IOException,
@@ -154,8 +155,7 @@ public class TestHFileOutputFormat2  {
         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
         for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
-          Cell kv = new KeyValue(keyBytes, family,
-              PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+          Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
           context.write(key, kv);
         }
       }
@@ -862,7 +862,7 @@ public class TestHFileOutputFormat2  {
 
     int taskId = context.getTaskAttemptID().getTaskID().getId();
     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
-
+    final byte [] qualifier = Bytes.toBytes("data");
     Random random = new Random();
     for (int i = 0; i < numRows; i++) {
 
@@ -871,8 +871,7 @@ public class TestHFileOutputFormat2  {
       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
       for (byte[] family : families) {
-        Cell kv = new KeyValue(keyBytes, family,
-            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
         writer.write(key, kv);
       }
     }