You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/04/29 20:26:44 UTC
svn commit: r1477250 - in /hbase/branches/0.95/hbase-server: pom.xml
src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Author: eclark
Date: Mon Apr 29 18:26:44 2013
New Revision: 1477250
URL: http://svn.apache.org/r1477250
Log:
HBASE-8407 Remove Async HBase
Modified:
hbase/branches/0.95/hbase-server/pom.xml
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Modified: hbase/branches/0.95/hbase-server/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/pom.xml?rev=1477250&r1=1477249&r2=1477250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/pom.xml (original)
+++ hbase/branches/0.95/hbase-server/pom.xml Mon Apr 29 18:26:44 2013
@@ -459,34 +459,6 @@
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</dependency>
- <dependency>
- <groupId>org.hbase</groupId>
- <artifactId>asynchbase</artifactId>
- <version>[1.3.1,)</version>
- <!--
- This is needed otherwise Maven complains because asynchbase depends on SLF4J 1.6:
- "The requested version 1.5.8 by your slf4j binding is not compatible with [1.6]"
- See http://stackoverflow.com/questions/5477942/slf4j-version-problem-while-building-through-the-maven
- Note that we can't do what Ceki suggests here, because v1.6 removed some interface
- that the Hadoop jar calls into, so we have to stick to the 1.5 version they pull.
- -->
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <!-- We also have to exclude the other slf4j libraries pulled by asynchbase. -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.codehaus.jettison</groupId>
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1477250&r1=1477249&r2=1477250&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Mon Apr 29 18:26:44 2013
@@ -87,13 +87,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.LineReader;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.hbase.async.GetRequest;
-import org.hbase.async.HBaseClient;
-import org.hbase.async.PleaseThrottleException;
-import org.hbase.async.PutRequest;
-import org.hbase.async.Scanner;
/**
* Script used evaluating HBase performance and scalability. Runs a HBase
@@ -172,8 +165,6 @@ public class PerformanceEvaluation exten
addCommandDescriptor(RandomReadTest.class, "randomRead",
"Run random read test");
- addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
- "Run random read test with asynchbase");
addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
"Run random seek and scan 100 test");
addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
@@ -186,20 +177,12 @@ public class PerformanceEvaluation exten
"Run random seek scan with both start and stop row (max 10000 rows)");
addCommandDescriptor(RandomWriteTest.class, "randomWrite",
"Run random write test");
- addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
- "Run random write test with asynchbase");
addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
"Run sequential read test");
- addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
- "Run sequential read test with asynchbase");
addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
"Run sequential write test");
- addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
- "Run sequential write test with asynchbase");
addCommandDescriptor(ScanTest.class, "scan",
"Run scan test (read every row)");
- addCommandDescriptor(AsyncScanTest.class, "asyncScan",
- "Run scan test with asynchbase (read every row)");
addCommandDescriptor(FilteredScanTest.class, "filterScan",
"Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
}
@@ -908,177 +891,6 @@ public class PerformanceEvaluation exten
abstract void testRow(final int i) throws IOException;
}
- static abstract class AsyncTest extends Test {
- /** Maximum number of RPCs we're allowed in flight at a time. */
- private static final int MAX_OUTSTANDING_RPCS = 200000; // Sized for 2G heap.
-
- private static HBaseClient client; // Only one client regardless of number of threads.
-
- AsyncTest(final Configuration conf, final TestOptions options, final Status status) {
- super(null, options, status);
- final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
- final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
- HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
- synchronized (AsyncTest.class) {
- if (client == null) {
- client = new HBaseClient(zkquorum, znode);
- // Sanity check.
- try {
- client.ensureTableFamilyExists(tableName, FAMILY_NAME).joinUninterruptibly();
- } catch (Exception e) {
- throw new RuntimeException("Missing test table/family?", e);
- }
- }
- }
- latch = new CountDownLatch(super.perClientRunRows);
- final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads();
- sem = new Semaphore(Math.max(100, maxrpcs));
- }
-
- /**
- * If true, make sure that every read returns a valid-looking KeyValue.
- */
- private static final boolean CHECK_READS = false;
-
- /** Checks that the row retrieved from HBase looks valid. */
- protected static void check(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
- if (!CHECK_READS) {
- return;
- }
- if (row.size() != 1) {
- throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')')
- + " KeyValue found in row");
- } else if (row.get(0).value().length != VALUE_LENGTH) {
- throw new IOException("Invalid value length (found: " + row.get(0).value().length
- + ", expected: " + VALUE_LENGTH + ") in row \""
- + new String(row.get(0).key()) + '"');
- }
- }
-
- private Exception error = null; // Last exception caught asynchronously.
- private volatile boolean failed = false; // True if we caught an exception asynchronously.
- /** Used by sub-classes to handle asynchronous exceptions. */
- protected final Callback<Exception, Exception> errback = new Callback<Exception, Exception>() {
- public Exception call(final Exception e) throws Exception {
- rpcCompleted();
- if (e instanceof PleaseThrottleException) {
- LOG.warn("Throttling thread " + Thread.currentThread().getName()
- + ", HBase isn't keeping up", e);
- final int permits = sem.drainPermits(); // Prevent creation of further RPCs.
- ((PleaseThrottleException) e).getDeferred().addBoth(new Callback<Object, Object>() {
- public Object call(final Object arg) {
- sem.release(permits);
- LOG.warn("Done throttling thread " + Thread.currentThread().getName());
- return arg;
- }
- public String toString() {
- return "error recovery after " + e;
- }
- });
- return null;
- }
- error = e;
- failed = true; // Volatile-write.
- LOG.error(this + " caught an exception", e);
- return e;
- }
-
- private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
- public String toString() {
- return toString;
- }
- };
-
- /**
- * Latch to guarantee we have gotten a response for every single RPC sent.
- * This latch is initialized up with the number of RPCs we intend to send.
- * Every time an RPC completes successfully, we decrement its count down
- * by one. This way we guarantee that all RPCs have completed and their
- * responses have been handled within the section of the code we're
- * timing.
- */
- private final CountDownLatch latch;
-
- /**
- * Semaphore to control the number of outstanding RPCs.
- * Because the producer code is synchronous and asynchbase is
- * non-blocking, the tests will try to create and send all RPCs at once,
- * thus running the app out of memory. In order to limit the number of
- * RPCs in flight at the same time, we acquire a permit from this
- * semaphore each time we access the client to send an RPC, and we release
- * the permit each time the RPC completes.
- */
- private final Semaphore sem;
-
- /** Records the completion of an RPC. */
- protected final void rpcCompleted() {
- sem.release();
- latch.countDown();
- }
-
- /** Callback used on successful read RPCs. */
- protected final Callback<Object, ArrayList<org.hbase.async.KeyValue>> readCallback = new Callback<Object, ArrayList<org.hbase.async.KeyValue>>() {
- public Object call(final ArrayList<org.hbase.async.KeyValue> row) throws IOException {
- rpcCompleted();
- check(row);
- return row;
- }
-
- private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
- public String toString() {
- return toString;
- }
- };
-
- /** Callback used on other successful RPCs. */
- protected final Callback<Object, Object> callback = new Callback<Object, Object>() {
- public Object call(final Object arg) {
- rpcCompleted();
- return arg;
- }
-
- private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
- public String toString() {
- return toString;
- }
- };
-
- @Override
- final void testSetup() {
- // Nothing.
- }
-
- @Override
- final void testTakedown() throws IOException {
- try {
- // For tests with few writes, asking for a flush before waiting on the
- // latch tells asynchbase to start flushing writes instead of waiting
- // until the timer flushes them.
- client.flush().join();
- latch.await(); // Make sure the last RPC completed.
- if (failed) { // Volatile-read
- throw error;
- }
- } catch (RuntimeException e) {
- throw e;
- } catch (IOException e) {
- throw e;
- } catch (Exception e) {
- throw new IOException("Uncaught exception from flush()", e);
- }
- }
-
- /** Returns the client to use to send an RPC. Call once per RPC. */
- protected final HBaseClient client() {
- try {
- sem.acquire();
- } catch (InterruptedException e) {
- LOG.error("Shouldn't happen!", e);
- return null;
- }
- return client;
- }
- }
@SuppressWarnings("unused")
static class RandomSeekScanTest extends Test {
@@ -1212,27 +1024,6 @@ public class PerformanceEvaluation exten
}
- static class AsyncRandomReadTest extends AsyncTest {
- AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
- }
-
- @Override
- void testRow(final int i) throws IOException {
- final GetRequest get = new GetRequest(tableName, getRandomRow(this.rand, this.totalRows));
- get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
-
- client().get(get).addCallback(readCallback).addErrback(errback);
- }
-
- @Override
- protected int getReportingPeriod() {
- int period = this.perClientRunRows / 100;
- return period == 0 ? this.perClientRunRows : period;
- }
-
- }
-
static class RandomWriteTest extends Test {
RandomWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@@ -1249,21 +1040,6 @@ public class PerformanceEvaluation exten
}
}
- static class AsyncRandomWriteTest extends AsyncTest {
- AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
- }
-
- @Override
- void testRow(final int i) {
- final PutRequest put = new PutRequest(tableName, getRandomRow(this.rand, this.totalRows),
- FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
- put.setDurable(writeToWAL);
- put.setBufferable(flushCommits);
- client().put(put).addCallbacks(callback, errback);
- }
-
- }
static class ScanTest extends Test {
private ResultScanner testScanner;
@@ -1293,50 +1069,6 @@ public class PerformanceEvaluation exten
}
- static class AsyncScanTest extends AsyncTest {
- private final Scanner scanner;
- private final Callback continueScan = new Callback<Object, ArrayList<ArrayList<org.hbase.async.KeyValue>>>() {
- public Object call(final ArrayList<ArrayList<org.hbase.async.KeyValue>> rows) throws Exception {
- if (rows != null) {
- testTimed();
- for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
- int n = row.size();
- while (n-- >= 0) {
- rpcCompleted();
- }
- }
- for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
- check(row); // Do this separate as it might throw.
- }
- } // else arg is null, we're done scanning.
- return rows;
- }
- public String toString() {
- return "continueScan on " + scanner;
- }
- };
-
- AsyncScanTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
- scanner = client().newScanner(tableName);
- scanner.setStartKey(format(this.startRow));
- scanner.setFamily(FAMILY_NAME);
- scanner.setQualifier(QUALIFIER_NAME);
- }
-
- @Override
- void testTimed() {
- scanner.nextRows()
- .addCallback(continueScan)
- .addCallbacks(callback, errback);
- }
-
- @Override
- void testRow(final int i) {
- // Unused because we completely redefined testTimed().
- }
- }
-
static class SequentialReadTest extends Test {
SequentialReadTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@@ -1351,20 +1083,6 @@ public class PerformanceEvaluation exten
}
- static class AsyncSequentialReadTest extends AsyncTest {
- AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
- }
-
- @Override
- void testRow(final int i) throws IOException {
- final GetRequest get = new GetRequest(tableName, format(i));
- get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
- client().get(get).addCallback(readCallback).addErrback(errback);
- }
-
- }
-
static class SequentialWriteTest extends Test {
SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
super(conf, options, status);
@@ -1381,22 +1099,6 @@ public class PerformanceEvaluation exten
}
- static class AsyncSequentialWriteTest extends AsyncTest {
- AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) {
- super(conf, options, status);
- }
-
- @Override
- void testRow(final int i) {
- final PutRequest put = new PutRequest(tableName, format(i),
- FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
- put.setDurable(writeToWAL);
- put.setBufferable(flushCommits);
- client().put(put).addCallbacks(callback, errback);
- }
-
- }
-
static class FilteredScanTest extends Test {
protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());