You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/09 00:14:00 UTC

[20/32] hbase git commit: HBASE-15157 Add *PerformanceTest for Append, CheckAnd*

HBASE-15157 Add *PerformanceTest for Append, CheckAnd*


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/81d81c98
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/81d81c98
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/81d81c98

Branch: refs/heads/hbase-12439
Commit: 81d81c9839118113e8076338315bf6c500065c09
Parents: 7239056
Author: stack <st...@apache.org>
Authored: Fri Feb 5 11:18:42 2016 -0800
Committer: stack <st...@apache.org>
Committed: Fri Feb 5 20:33:55 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/IncrementPerformanceTest.java  | 128 ----------------
 .../hadoop/hbase/PerformanceEvaluation.java     | 148 ++++++++++++++++++-
 2 files changed, 142 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/81d81c98/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java
deleted file mode 100644
index aed3d0a..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/IncrementPerformanceTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-
-
-/**
- * Simple Increments Performance Test. Run this from main. It is to go against a cluster.
- * Presumption is the table exists already. Defaults are a zk ensemble of localhost:2181,
- * a tableName of 'tableName', a column famly name of 'columnFamilyName', with 80 threads by
- * default and 10000 increments per thread. To change any of these configs, pass -DNAME=VALUE as
- * in -DtableName="newTableName". It prints out configuration it is running with at the start and
- * on the end it prints out percentiles.
- */
-public class IncrementPerformanceTest implements Tool {
-  private static final Log LOG = LogFactory.getLog(IncrementPerformanceTest.class);
-  private static final byte [] QUALIFIER = new byte [] {'q'};
-  private Configuration conf;
-  private final MetricRegistry metrics = new MetricRegistry();
-  private static final String TABLENAME = "tableName";
-  private static final String COLUMN_FAMILY = "columnFamilyName";
-  private static final String THREAD_COUNT = "threadCount";
-  private static final int DEFAULT_THREAD_COUNT = 80;
-  private static final String INCREMENT_COUNT = "incrementCount";
-  private static final int DEFAULT_INCREMENT_COUNT = 10000;
-
-  IncrementPerformanceTest() {}
-
-  public int run(final String [] args) throws Exception {
-    Configuration conf = getConf();
-    final TableName tableName = TableName.valueOf(conf.get(TABLENAME), TABLENAME);
-    final byte [] columnFamilyName = Bytes.toBytes(conf.get(COLUMN_FAMILY, COLUMN_FAMILY));
-    int threadCount = conf.getInt(THREAD_COUNT, DEFAULT_THREAD_COUNT);
-    final int incrementCount = conf.getInt(INCREMENT_COUNT, DEFAULT_INCREMENT_COUNT);
-    LOG.info("Running test with " + HConstants.ZOOKEEPER_QUORUM + "=" +
-      getConf().get(HConstants.ZOOKEEPER_QUORUM) + ", tableName=" + tableName +
-      ", columnFamilyName=" + columnFamilyName + ", threadCount=" + threadCount +
-      ", incrementCount=" + incrementCount);
-
-    ExecutorService service = Executors.newFixedThreadPool(threadCount);
-    Set<Future<?>> futures = new HashSet<Future<?>>();
-    final AtomicInteger integer = new AtomicInteger(0); // needed a simple "final" counter
-    while (integer.incrementAndGet() <= threadCount) {
-      futures.add(service.submit(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            try (Connection connection = ConnectionFactory.createConnection(getConf())) {
-              try (Table table = connection.getTable(tableName)) {
-                Timer timer = metrics.timer("increments");
-                for (int i = 0; i < incrementCount; i++) {
-                  byte[] row = Bytes.toBytes(i);
-                  Timer.Context context = timer.time();
-                  try {
-                    table.incrementColumnValue(row, columnFamilyName, QUALIFIER, 1l);
-                  } catch (IOException e) {
-                    // swallow..it's a test.
-                  } finally {
-                    context.stop();
-                  }
-                }
-              }
-            }
-          } catch (IOException ioe) {
-            throw new RuntimeException(ioe);
-          }
-        }
-      }));
-    }
-
-    for(Future<?> future : futures) future.get();
-    service.shutdown();
-    Snapshot s = metrics.timer("increments").getSnapshot();
-    LOG.info(String.format("75th=%s, 95th=%s, 99th=%s", s.get75thPercentile(),
-        s.get95thPercentile(), s.get99thPercentile()));
-    return 0;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return this.conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  public static void main(String[] args) throws Exception {
-    System.exit(ToolRunner.run(HBaseConfiguration.create(), new IncrementPerformanceTest(), args));
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/81d81c98/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index 821b995..651bc86 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -49,19 +49,24 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterAllFilter;
 import org.apache.hadoop.hbase.filter.FilterList;
@@ -165,7 +170,17 @@ public class PerformanceEvaluation extends Configured implements Tool {
       "Run scan test (read every row)");
     addCommandDescriptor(FilteredScanTest.class, "filterScan",
       "Run scan test using a filter to find a specific row based on it's value " +
-        "(make sure to use --rows=20)");
+      "(make sure to use --rows=20)");
+    addCommandDescriptor(IncrementTest.class, "increment",
+      "Increment on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(AppendTest.class, "append",
+      "Append on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
+      "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
+      "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
+    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
+      "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
   }
 
   /**
@@ -1089,15 +1104,24 @@ public class PerformanceEvaluation extends Configured implements Tool {
       return (System.nanoTime() - startTime) / 1000000;
     }
 
+    int getStartRow() {
+      return opts.startRow;
+    }
+
+    int getLastRow() {
+      return getStartRow() + opts.perClientRunRows;
+    }
+
     /**
      * Provides an extension point for tests that don't want a per row invocation.
      */
     void testTimed() throws IOException, InterruptedException {
-      int lastRow = opts.startRow + opts.perClientRunRows;
+      int startRow = getStartRow();
+      int lastRow = getLastRow();
       // Report on completion of 1/10th of total.
       for (int ii = 0; ii < opts.cycles; ii++) {
         if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
-        for (int i = opts.startRow; i < lastRow; i++) {
+        for (int i = startRow; i < lastRow; i++) {
           if (i % everyN != 0) continue;
           long startTime = System.nanoTime();
           TraceScope scope = Trace.startSpan("test row", traceSampler);
@@ -1106,15 +1130,16 @@ public class PerformanceEvaluation extends Configured implements Tool {
           } finally {
             scope.close();
           }
-          if ( (i - opts.startRow) > opts.measureAfter) {
+          if ( (i - startRow) > opts.measureAfter) {
             latency.update((System.nanoTime() - startTime) / 1000);
             if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
-              status.setStatus(generateStatus(opts.startRow, i, lastRow));
+              status.setStatus(generateStatus(startRow, i, lastRow));
             }
           }
         }
       }
     }
+
     /**
      * report percentiles of latency
      * @throws IOException
@@ -1456,7 +1481,116 @@ public class PerformanceEvaluation extends Configured implements Tool {
       Result r = testScanner.next();
       updateValueSize(r);
     }
+  }
+
+  /**
+   * Base class for operations that are CAS-like; that read a value and then set it based off what
+   * they read. In this category is increment, append, checkAndPut, etc.
+   *
+   * <p>These operations also want some concurrency going on. Usually when these tests run, they
+   * operate in their own part of the key range. In CASTest, we will have them all overlap on the
+   * same key space. We do this with our getStartRow and getLastRow overrides.
+   */
+  static abstract class CASTableTest extends TableTest {
+    private final byte [] qualifier;
+    CASTableTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
+    }
+
+    byte [] getQualifier() {
+      return this.qualifier;
+    }
+
+    @Override
+    int getStartRow() {
+      return 0;
+    }
+
+    @Override
+    int getLastRow() {
+      return opts.perClientRunRows;
+    }
+  }
+
+  static class IncrementTest extends CASTableTest {
+    IncrementTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      Increment increment = new Increment(format(i));
+      increment.addColumn(FAMILY_NAME, getQualifier(), 1l);
+      updateValueSize(this.table.increment(increment));
+    }
+  }
+
+  static class AppendTest extends CASTableTest {
+    AppendTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      Append append = new Append(bytes);
+      append.add(FAMILY_NAME, getQualifier(), bytes);
+      updateValueSize(this.table.append(append));
+    }
+  }
+
+  static class CheckAndMutateTest extends CASTableTest {
+    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.addColumn(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      RowMutations mutations = new RowMutations(bytes);
+      mutations.add(put);
+      this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes,
+          mutations);
+    }
+  }
 
+  static class CheckAndPutTest extends CASTableTest {
+    CheckAndPutTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.addColumn(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, put);
+    }
+  }
+
+  static class CheckAndDeleteTest extends CASTableTest {
+    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.addColumn(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      Delete delete = new Delete(put.getRow());
+      delete.addColumn(FAMILY_NAME, getQualifier());
+      this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), CompareOp.EQUAL, bytes, delete);
+    }
   }
 
   static class SequentialReadTest extends TableTest {
@@ -1760,8 +1894,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
       "clients (and HRegionServers)");
     System.err.println("                 running: 1 <= value <= 500");
     System.err.println("Examples:");
-    System.err.println(" To run a single evaluation client:");
+    System.err.println(" To run a single client doing the default 1M sequentialWrites:");
     System.err.println(" $ bin/hbase " + className + " sequentialWrite 1");
+    System.err.println(" To run 10 clients doing increments over ten rows:");
+    System.err.println(" $ bin/hbase " + className + " --rows=10 --nomapred increment 10");
   }
 
   /**