You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/04/29 20:26:44 UTC

svn commit: r1477250 - in /hbase/branches/0.95/hbase-server: pom.xml src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java

Author: eclark
Date: Mon Apr 29 18:26:44 2013
New Revision: 1477250

URL: http://svn.apache.org/r1477250
Log:
HBASE-8407 Remove Async HBase

Modified:
    hbase/branches/0.95/hbase-server/pom.xml
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java

Modified: hbase/branches/0.95/hbase-server/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/pom.xml?rev=1477250&r1=1477249&r2=1477250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/pom.xml (original)
+++ hbase/branches/0.95/hbase-server/pom.xml Mon Apr 29 18:26:44 2013
@@ -459,34 +459,6 @@
       <groupId>stax</groupId>
       <artifactId>stax-api</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.hbase</groupId>
-      <artifactId>asynchbase</artifactId>
-      <version>[1.3.1,)</version>
-      <!--
-        This is needed otherwise Maven complains because asynchbase depends on SLF4J 1.6:
-        "The requested version 1.5.8 by your slf4j binding is not compatible with [1.6]"
-        See http://stackoverflow.com/questions/5477942/slf4j-version-problem-while-building-through-the-maven
-        Note that we can't do what Ceki suggests here, because v1.6 removed some interface
-        that the Hadoop jar calls into, so we have to stick to the 1.5 version they pull.
-      -->
-      <exclusions>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-api</artifactId>
-        </exclusion>
-        <!-- We also have to exclude the other slf4j libraries pulled by asynchbase.  -->
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>jcl-over-slf4j</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>log4j-over-slf4j</artifactId>
-        </exclusion>
-      </exclusions>
-      <scope>test</scope>
-    </dependency>
     <!-- Test Dependencies -->
     <dependency>
       <groupId>org.codehaus.jettison</groupId>

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1477250&r1=1477249&r2=1477250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Mon Apr 29 18:26:44 2013
@@ -87,13 +87,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.LineReader;
 
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.hbase.async.GetRequest;
-import org.hbase.async.HBaseClient;
-import org.hbase.async.PleaseThrottleException;
-import org.hbase.async.PutRequest;
-import org.hbase.async.Scanner;
 
 /**
  * Script used evaluating HBase performance and scalability.  Runs a HBase
@@ -172,8 +165,6 @@ public class PerformanceEvaluation exten
 
     addCommandDescriptor(RandomReadTest.class, "randomRead",
         "Run random read test");
-    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
-        "Run random read test with asynchbase");
     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
         "Run random seek and scan 100 test");
     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
@@ -186,20 +177,12 @@ public class PerformanceEvaluation exten
         "Run random seek scan with both start and stop row (max 10000 rows)");
     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
         "Run random write test");
-    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
-        "Run random write test with asynchbase");
     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
         "Run sequential read test");
-    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
-        "Run sequential read test with asynchbase");
     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
         "Run sequential write test");
-    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
-        "Run sequential write test with asynchbase");
     addCommandDescriptor(ScanTest.class, "scan",
         "Run scan test (read every row)");
-    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
-        "Run scan test with asynchbase (read every row)");
     addCommandDescriptor(FilteredScanTest.class, "filterScan",
         "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
   }
@@ -908,177 +891,6 @@ public class PerformanceEvaluation exten
     abstract void testRow(final int i) throws IOException;
   }
 
-  static abstract class AsyncTest extends Test {
-    /** Maximum number of RPCs we're allowed in flight at a time.  */
-    private static final int MAX_OUTSTANDING_RPCS = 200000;  // Sized for 2G heap.
-
-    private static HBaseClient client;  // Only one client regardless of number of threads.
-
-    AsyncTest(final Configuration conf, final TestOptions options, final Status status) {
-      super(null, options, status);
-      final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
-      final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
-                                    HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
-      synchronized (AsyncTest.class) {
-        if (client == null) {
-          client = new HBaseClient(zkquorum, znode);
-          // Sanity check.
-          try {
-            client.ensureTableFamilyExists(tableName, FAMILY_NAME).joinUninterruptibly();
-          } catch (Exception e) {
-            throw new RuntimeException("Missing test table/family?", e);
-          }
-        }
-      }
-      latch = new CountDownLatch(super.perClientRunRows);
-      final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads();
-      sem = new Semaphore(Math.max(100, maxrpcs));
-    }
-
-    /**
-     * If true, make sure that every read returns a valid-looking KeyValue.
-     */
-    private static final boolean CHECK_READS = false;
-
-    /** Checks that the row retrieved from HBase looks valid.  */
-    protected static void check(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
-      if (!CHECK_READS) {
-        return;
-      }
-      if (row.size() != 1) {
-        throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')')
-                              + " KeyValue found in row");
-      } else if (row.get(0).value().length != VALUE_LENGTH) {
-        throw new IOException("Invalid value length (found: " + row.get(0).value().length
-                              + ", expected: " + VALUE_LENGTH + ") in row \""
-                              + new String(row.get(0).key()) + '"');
-      }
-    }
-
-    private Exception error = null;  // Last exception caught asynchronously.
-    private volatile boolean failed = false;  // True if we caught an exception asynchronously.
-    /** Used by sub-classes to handle asynchronous exceptions.  */
-    protected final Callback<Exception, Exception> errback = new Callback<Exception, Exception>() {
-      public Exception call(final Exception e) throws Exception {
-        rpcCompleted();
-        if (e instanceof PleaseThrottleException) {
-          LOG.warn("Throttling thread " + Thread.currentThread().getName()
-                   + ", HBase isn't keeping up", e);
-          final int permits = sem.drainPermits();  // Prevent creation of further RPCs.
-          ((PleaseThrottleException) e).getDeferred().addBoth(new Callback<Object, Object>() {
-            public Object call(final Object arg) {
-              sem.release(permits);
-              LOG.warn("Done throttling thread " + Thread.currentThread().getName());
-              return arg;
-            }
-            public String toString() {
-              return "error recovery after " + e;
-            }
-          });
-          return null;
-        }
-        error = e;
-        failed = true;  // Volatile-write.
-        LOG.error(this + " caught an exception", e);
-        return e;
-      }
-
-      private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
-      public String toString() {
-        return toString;
-      }
-    };
-
-    /**
-     * Latch to guarantee we have gotten a response for every single RPC sent.
-     * This latch is initialized up with the number of RPCs we intend to send.
-     * Every time an RPC completes successfully, we decrement its count down
-     * by one.  This way we guarantee that all RPCs have completed and their
-     * responses have been handled within the section of the code we're
-     * timing.
-     */
-    private final CountDownLatch latch;
-
-    /**
-     * Semaphore to control the number of outstanding RPCs.
-     * Because the producer code is synchronous and asynchbase is
-     * non-blocking, the tests will try to create and send all RPCs at once,
-     * thus running the app out of memory.  In order to limit the number of
-     * RPCs in flight at the same time, we acquire a permit from this
-     * semaphore each time we access the client to send an RPC, and we release
-     * the permit each time the RPC completes.
-     */
-    private final Semaphore sem;
-
-    /** Records the completion of an RPC.  */
-    protected final void rpcCompleted() {
-      sem.release();
-      latch.countDown();
-    }
-
-    /** Callback used on successful read RPCs.  */
-    protected final Callback<Object, ArrayList<org.hbase.async.KeyValue>> readCallback = new Callback<Object, ArrayList<org.hbase.async.KeyValue>>() {
-      public Object call(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
-        rpcCompleted();
-        check(row);
-        return row;
-      }
-
-      private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
-      public String toString() {
-        return toString;
-      }
-    };
-
-    /** Callback used on other successful RPCs.  */
-    protected final Callback<Object, Object> callback = new Callback<Object, Object>() {
-      public Object call(final Object arg) {
-        rpcCompleted();
-        return arg;
-      }
-
-      private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
-      public String toString() {
-        return toString;
-      }
-    };
-
-    @Override
-    final void testSetup() {
-      // Nothing.
-    }
-
-    @Override
-    final void testTakedown() throws IOException {
-      try {
-        // For tests with few writes, asking for a flush before waiting on the
-        // latch tells asynchbase to start flushing writes instead of waiting
-        // until the timer flushes them.
-        client.flush().join();
-        latch.await();  // Make sure the last RPC completed.
-        if (failed) {  // Volatile-read
-          throw error;
-        }
-      } catch (RuntimeException e) {
-        throw e;
-      } catch (IOException e) {
-        throw e;
-      } catch (Exception e) {
-        throw new IOException("Uncaught exception from flush()", e);
-      }
-    }
-
-    /** Returns the client to use to send an RPC.  Call once per RPC.  */
-    protected final HBaseClient client() {
-      try {
-        sem.acquire();
-      } catch (InterruptedException e) {
-        LOG.error("Shouldn't happen!", e);
-        return null;
-      }
-      return client;
-    }
-  }
 
   @SuppressWarnings("unused")
   static class RandomSeekScanTest extends Test {
@@ -1212,27 +1024,6 @@ public class PerformanceEvaluation exten
 
   }
 
-  static class AsyncRandomReadTest extends AsyncTest {
-    AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
-    }
-
-    @Override
-    void testRow(final int i) throws IOException {
-      final GetRequest get = new GetRequest(tableName, getRandomRow(this.rand, this.totalRows));
-      get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
-
-      client().get(get).addCallback(readCallback).addErrback(errback);
-    }
-
-    @Override
-    protected int getReportingPeriod() {
-      int period = this.perClientRunRows / 100;
-      return period == 0 ? this.perClientRunRows : period;
-    }
-
-  }
-
   static class RandomWriteTest extends Test {
     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
@@ -1249,21 +1040,6 @@ public class PerformanceEvaluation exten
     }
   }
 
-  static class AsyncRandomWriteTest extends AsyncTest {
-    AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
-    }
-
-    @Override
-    void testRow(final int i) {
-      final PutRequest put = new PutRequest(tableName, getRandomRow(this.rand, this.totalRows),
-                                            FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
-      put.setDurable(writeToWAL);
-      put.setBufferable(flushCommits);
-      client().put(put).addCallbacks(callback, errback);
-    }
-
-  }
 
   static class ScanTest extends Test {
     private ResultScanner testScanner;
@@ -1293,50 +1069,6 @@ public class PerformanceEvaluation exten
 
   }
 
-  static class AsyncScanTest extends AsyncTest {
-    private final Scanner scanner;
-    private final Callback continueScan = new Callback<Object, ArrayList<ArrayList<org.hbase.async.KeyValue>>>() {
-      public Object call(final ArrayList<ArrayList<org.hbase.async.KeyValue>> rows) throws Exception {
-        if (rows != null) {
-          testTimed();
-          for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
-            int n = row.size();
-            while (n-- >= 0) {
-              rpcCompleted();
-            }
-          }
-          for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
-            check(row);  // Do this separate as it might throw.
-          }
-        }  // else arg is null, we're done scanning.
-        return rows;
-      }
-      public String toString() {
-        return "continueScan on " + scanner;
-      }
-    };
-
-    AsyncScanTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
-      scanner = client().newScanner(tableName);
-      scanner.setStartKey(format(this.startRow));
-      scanner.setFamily(FAMILY_NAME);
-      scanner.setQualifier(QUALIFIER_NAME);
-    }
-
-    @Override
-    void testTimed() {
-      scanner.nextRows()
-        .addCallback(continueScan)
-        .addCallbacks(callback, errback);
-    }
-
-    @Override
-    void testRow(final int i) {
-      // Unused because we completely redefined testTimed().
-    }
-  }
-
   static class SequentialReadTest extends Test {
     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
@@ -1351,20 +1083,6 @@ public class PerformanceEvaluation exten
 
   }
 
-  static class AsyncSequentialReadTest extends AsyncTest {
-    AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
-    }
-
-    @Override
-    void testRow(final int i) throws IOException {
-      final GetRequest get = new GetRequest(tableName, format(i));
-      get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
-      client().get(get).addCallback(readCallback).addErrback(errback);
-    }
-
-  }
-
   static class SequentialWriteTest extends Test {
     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
@@ -1381,22 +1099,6 @@ public class PerformanceEvaluation exten
 
   }
 
-  static class AsyncSequentialWriteTest extends AsyncTest {
-    AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) {
-      super(conf, options, status);
-    }
-
-    @Override
-    void testRow(final int i) {
-      final PutRequest put = new PutRequest(tableName, format(i),
-                                            FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
-      put.setDurable(writeToWAL);
-      put.setBufferable(flushCommits);
-      client().put(put).addCallbacks(callback, errback);
-    }
-
-  }
-
   static class FilteredScanTest extends Test {
     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());