You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/17 02:48:37 UTC
svn commit: r1588111 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/
test/java/org/apache/hadoop/hbase/client/
Author: liyin
Date: Thu Apr 17 00:48:37 2014
New Revision: 1588111
URL: http://svn.apache.org/r1588111
Log:
[HBASE-10709] Fix the bug of rejecting exception in HTableClientScanner when too many scanner running at the same time.
Author: daviddeng
Summary:
`SynchronousQueue` is replaced with `LinkedBlockingQueue`
Background thread is no longer blocked waiting for main thread to consume the data.
Test Plan: `TestHTableClientScanner`
Reviewers: liyintang, manukranthk, elliott, ehwang
Reviewed By: ehwang
CC: hbase-eng@, elliott, ehwang
Differential Revision: https://phabricator.fb.com/D1271322
Task ID: 4073520
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java Thu Apr 17 00:48:37 2014
@@ -24,9 +24,9 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,13 +36,14 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
* through them all.
*/
-public class HTableClientScanner implements ResultScanner, Runnable {
+public class HTableClientScanner implements ResultScanner {
private static final Log LOG = LogFactory.getLog(HTableClientScanner.class);
// End of Scanning
private static final Result[] EOS = new Result[0];
@@ -50,14 +51,10 @@ public class HTableClientScanner impleme
private static final int MAX_THREADS_IN_POOL = Runtime.getRuntime()
.availableProcessors();
- private static final ExecutorService executor = new ThreadPoolExecutor(1,
- MAX_THREADS_IN_POOL, 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
-
- // HEADSUP: The scan internal start row can change as we move through table.
- protected final Scan scan;
- // The number of prefetched and cached results
- private final int caching;
+ private static final ExecutorService executor = Executors.newFixedThreadPool(
+ MAX_THREADS_IN_POOL, new DaemonThreadFactory(
+ "HTableClientScanner.Fetching."));
+
// Temporary results list in main thread, may be null
private Result[] currentResults;
// The position of next unfetched results in currentResults if it is
@@ -65,37 +62,44 @@ public class HTableClientScanner impleme
private int currentPos;
// Whether this client has closed.
private boolean closed;
- /**
- * The queue transferring fetched Result[] to main thread.
- * When queue.take() returns an EOS, scanning ends.
- */
+ // The queue transferring fetched Result[] to main thread.
+ // When queue.take() returns an EOS, scanning ends.
private final ArrayBlockingQueue<Result[]> queue;
+ // A place storing Result[] in case the queue is full. It is set only at
+ // fetcher thread, will be cleared in main thread.
+ private final AtomicReference<Result[]> justFetched = new AtomicReference<>();
+ // Contains exception thrown in fetcher thread.
+ private final AtomicReference<Throwable> exception = new AtomicReference<>();
// The variable informing fetching thread to stop
- private volatile boolean closing;
- // Contains the exception caught in fetch thread.
- private volatile Throwable exception;
+ private final AtomicBoolean closing = new AtomicBoolean(false);
- private final HTable table;
+ private final Fetcher fetcher;
/**
* Constructor.
+ *
+ * @param scan The scan internal start row can change as we move through
+ * table.
*/
- public HTableClientScanner(Scan scan, HTable table) {
- this.scan = scan;
- this.table = table;
+ public HTableClientScanner(Scan scan, HTable table) throws IOException {
this.queue = new ArrayBlockingQueue<>(table.getConfiguration().getInt(
HConstants.HBASE_CLIENT_SCANNER_QUEUE_LENGTH,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH));
+ int caching;
if (scan.getCaching() > 0) {
- this.caching = scan.getCaching();
+ caching = scan.getCaching();
} else {
- this.caching = table.getScannerCaching();
+ caching = table.getScannerCaching();
}
+
+ fetcher = new Fetcher(table, scan, caching, queue, justFetched, exception,
+ closing);
}
HTableClientScanner initialize() {
- executor.execute(this);
+ executor.execute(fetcher);
+
return this;
}
@@ -117,7 +121,7 @@ public class HTableClientScanner impleme
}
/**
- * Fetches results from queue to currentResults if it is not null.
+ * Fetches results from queue to currentResults if it is null.
*
* @return true if more results available, false if end of scanning
*/
@@ -132,27 +136,39 @@ public class HTableClientScanner impleme
try {
currentResults = queue.take();
+
if (currentResults.length == 0) {
// End of scanning
closed = true;
currentResults = null;
- if (exception != null) {
-
+ Throwable e = this.exception.get();
+ if (e != null) {
// Failure of scanning
- throwIOException(exception);
+ throwIOException(e);
}
return false;
}
- // Results fetched
- currentPos = 0;
- return true;
+ Result[] jf = justFetched.getAndSet(null);
+ if (jf != null) {
+ // Something is put justFetched because the queue is full when those
+ // results are fetched. The fetching task should not be running now.
+ queue.add(jf);
+ if (jf.length > 0) {
+ // We may have more results
+ executor.execute(fetcher);
+ }
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
+
+ // Results fetched
+ currentPos = 0;
+ return true;
}
@Override
@@ -212,7 +228,7 @@ public class HTableClientScanner impleme
if (this.closed) {
return;
}
- this.closing = true;
+ this.closing.set(true);
try {
while (fetchFromQueue()) {
// skip all results
@@ -224,137 +240,219 @@ public class HTableClientScanner impleme
}
}
- private Result[] call(ScannerCallable callable) throws IOException {
- return table.getConnectionAndResetOperationContext()
- .getRegionServerWithRetries(callable);
+ @Override
+ public boolean isClosed() {
+ return closed;
}
- // Returns a ScannerCallable with a start key
- private ScannerCallable getScannerCallable(byte[] startKey) {
- scan.setStartRow(startKey);
- ScannerCallable s = new ScannerCallable(
- table.getConnectionAndResetOperationContext(),
- table.getTableNameStringBytes(),
- scan, table.getOptions());
- s.setCaching(caching);
- return s;
- }
+ private static class Fetcher implements Runnable {
+ // The startKey for opening a scanner.
+ private byte[] startKey;
+ // The callable for scanning
+ private ScannerCallable callable;
+ // Current scanning region info.
+ private HRegionInfo currentRegion;
+ // Timestamp of last successful scan
+ private long lastSuccNextTs;
+ // The last result returned.
+ private Result lastRes = null;
+
+ private final HTable table;
+ private final Scan scan;
+ private final int caching;
+
+ private final ArrayBlockingQueue<Result[]> queue;
+ private final AtomicReference<Result[]> justFetched;
+ private final AtomicReference<Throwable> exception;
+ private final AtomicBoolean closing;
+
+ public Fetcher(HTable table, Scan scan, int caching,
+ ArrayBlockingQueue<Result[]> queue,
+ AtomicReference<Result[]> justFetched,
+ AtomicReference<Throwable> exception, AtomicBoolean closing) {
+ this.table = table;
+ this.scan = scan;
+ this.caching = caching;
+
+ this.queue = queue;
+ this.justFetched = justFetched;
+ this.exception = exception;
+ this.closing = closing;
+
+ // Initialize startKey
+ startKey = scan.getStartRow();
+ if (startKey == null) {
+ // In case startKey == null, set it to zero-length byte array since
+ // null means end-of-scan.
+ startKey = HConstants.EMPTY_BYTE_ARRAY;
+ }
+ }
- // Closes a callable silently.
- private void closeScanner(ScannerCallable callable) {
- callable.setClose();
- try {
- call(callable);
- } catch (IOException e) {
- // We used to catch this error, interpret, and rethrow. However, we
- // have since decided that it's not nice for a scanner's close to
- // throw exceptions. Chances are it was just an UnknownScanner
- // exception due to lease time out.
- LOG.error("Exception caught during closeScanner", e);
+ private Result[] call(ScannerCallable callable) throws IOException {
+ return table.getConnectionAndResetOperationContext()
+ .getRegionServerWithRetries(callable);
+ }
+
+ // Returns a ScannerCallable with a start key
+ private ScannerCallable getScannerCallable(byte[] startKey) {
+ scan.setStartRow(startKey);
+ ScannerCallable s =
+ new ScannerCallable(table.getConnectionAndResetOperationContext(),
+ table.getTableNameStringBytes(), scan, table.getOptions());
+ s.setCaching(caching);
+ return s;
+ }
+
+ // Closes a callable silently.
+ private void closeScanner(ScannerCallable callable) {
+ callable.setClose();
+ try {
+ call(callable);
+ } catch (IOException e) {
+ // We used to catch this error, interpret, and rethrow. However, we
+ // have since decided that it's not nice for a scanner's close to
+ // throw exceptions. Chances are it was just an UnknownScanner
+ // exception due to lease time out.
+ LOG.error("Exception caught during closeScanner", e);
+ }
}
- }
- /**
- * Scans a region server, results are put to queue.
- *
- * @return New start key if scanning does not end, null otherwise
- * @throws IOException
- * @throws InterruptedException
- */
- private byte[] scanRegionServer(byte[] startKey) throws IOException,
- InterruptedException {
- // Open a scanner
- ScannerCallable callable = getScannerCallable(startKey);
- // openScanner
- call(callable);
- HRegionInfo currentRegion = callable.getHRegionInfo();
+ /**
+ * Keep scanning on a region server. If we can get some results, they are
+ * returned, otherwise a null is returned.
+ *
+ * startKey is changed if necessary. At the end of scanning, it is set to
+ * null.
+ *
+ * @return an non-empty array of Result if we get some data. null otherwise.
+ */
+ private Result[] scanRegionServer() throws IOException,
+ InterruptedException {
+ if (callable == null) {
+ // Open a scanner
+ callable = getScannerCallable(startKey);
+ // openScanner
+ call(callable);
+ currentRegion = callable.getHRegionInfo();
- Result lastRes = null;
- long lastSuccNextTs = System.currentTimeMillis();
- try {
- while (!closing) {
+ lastRes = null;
+ lastSuccNextTs = System.currentTimeMillis();
+ }
+
+ boolean keepCallable = false;
+
+ try {
Result[] values = call(callable);
if (values == null) {
// End of scanning
- return null;
+ startKey = null;
} else if (values.length == 0) {
// End of region
- return currentRegion.getEndKey();
- }
+ startKey = currentRegion.getEndKey();
+ // Mark startKey as null for last region.
+ if (startKey != null && startKey.length == 0) {
+ startKey = null;
+ }
+ } else {
+ // We got some results
+ lastRes = values[values.length - 1];
+ lastSuccNextTs = System.currentTimeMillis();
+
+ // In this case, we keep callable
+ keepCallable = true;
- lastRes = values[values.length - 1];
- if (!closing) {
- queue.put(values);
+ return values;
}
- lastSuccNextTs = System.currentTimeMillis();
- }
- } catch (DoNotRetryIOException e) {
- boolean canRetry = false;
- if (e instanceof UnknownScannerException) {
- long timeoutTs = lastSuccNextTs + table.scannerTimeout;
- long now = System.currentTimeMillis();
- if (now > timeoutTs) {
- // Scanner timeout
- long elapsed = now - lastSuccNextTs;
- ScannerTimeoutException ex = new ScannerTimeoutException(elapsed
- + "ms pased since the last invocation, "
- + "timetout is current set to " + table.scannerTimeout);
- ex.initCause(e);
- throw ex;
+ } catch (DoNotRetryIOException e) {
+ boolean canRetry = false;
+ if (e instanceof UnknownScannerException) {
+ // The region server may restarted.
+ long timeoutTs = lastSuccNextTs + table.scannerTimeout;
+ long now = System.currentTimeMillis();
+ if (now > timeoutTs) {
+ // Scanner timeout
+ long elapsed = now - lastSuccNextTs;
+ ScannerTimeoutException ex =
+ new ScannerTimeoutException(elapsed
+ + "ms pased since the last invocation, "
+ + "timetout is current set to " + table.scannerTimeout);
+ ex.initCause(e);
+ throw ex;
+ }
+
+ canRetry = true; // scannerTimeout
+ } else {
+ Throwable cause = e.getCause();
+ if (cause != null && cause instanceof NotServingRegionException) {
+ canRetry = true;
+ }
}
- canRetry = true; // scannerTimeout
- } else {
- Throwable cause = e.getCause();
- if (cause != null && cause instanceof NotServingRegionException) {
- canRetry = true;
+ if (!canRetry) {
+ // Cannot retry, simply throw it out
+ throw e;
}
- }
-
- if (!canRetry) {
- // Cannot retry, simply throw it out
- throw e;
- }
- if (lastRes != null) {
- return Bytes.nextOf(lastRes.getRow());
+ if (lastRes != null) {
+ // Skip lastRes since it has been returned.
+ startKey = Bytes.nextOf(lastRes.getRow());
+ }
+ } finally {
+ if (!keepCallable) {
+ closeScanner(callable);
+ callable = null;
+ }
}
- return startKey;
- } finally {
- closeScanner(callable);
+ return null;
}
- // Only reach here when closing is true
- return null;
- }
- @Override
- public void run() {
- try {
- byte[] startKey = this.scan.getStartRow();
- while (!closing) {
- startKey = scanRegionServer(startKey);
- if (startKey == null || startKey.length == 0) {
- break;
+ /**
+ * Puts results in queue or justFetched.
+ *
+ * @return whether we should continue fetching in this run.
+ */
+ private boolean putResults(Result[] results) {
+ if (!queue.offer(results)) {
+ // queue is full, put results in justFetched
+ justFetched.set(results);
+
+ if (queue.isEmpty()) {
+ // It's possible the queue is empty before justFetched is set
+ // and the main thread is blocking on queue.Take().
+ // We try move results in justFetched to queue here.
+ Result[] js = justFetched.getAndSet(null);
+ if (js != null) {
+ queue.add(js);
+ return true;
+ }
+ // If js == null, it means the main thread moved justFetched to
+ // queue and arranged a new run.
}
+ // Then quit from this run. New run is submitted when some results
+ // are taken out of the queue
+ return false;
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- exception = e;
- } catch (Throwable e) {
- exception = e;
+ return true;
}
- try {
- queue.put(EOS);
- } catch (InterruptedException e) {
- LOG.info("Fetching thread interrupted", e);
- Thread.currentThread().interrupt();
+ @Override
+ public void run() {
+ try {
+ while (!closing.get() && startKey != null) {
+ Result[] results = scanRegionServer();
+
+ if (results != null) {
+ if (!putResults(results)) {
+ return;
+ }
+ }
+ }
+ } catch (Throwable e) {
+ exception.set(e);
+ }
+ // We only get here scanning is over or aborted with exception
+ putResults(EOS);
}
}
-
- @Override
- public boolean isClosed() {
- return closed;
- }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Thu Apr 17 00:48:37 2014
@@ -113,7 +113,8 @@ public interface HTableInterface {
* Returns a scanner on the current table as specified by the {@link Scan}
* object.
*
- * @param scan A configured {@link Scan} object.
+ * @param scan A configured {@link Scan} object. NOTE scan may be kept and
+ * changed inside. The caller should not reuse it.
* @return A scanner.
* @throws IOException if a remote or network exception occurs.
* @since 0.20.0
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Apr 17 00:48:37 2014
@@ -1951,12 +1951,13 @@ REGION_LOOP:
public static int countRows(final HTable t, final Scan s)
throws IOException {
// Assert all rows in table.
- ResultScanner scanner = t.getScanner(s);
- int count = 0;
- for (Result result: scanner) {
- count++;
- assertTrue(result.size() > 0);
+ try (ResultScanner scanner = t.getScanner(s)) {
+ int count = 0;
+ for (Result result : scanner) {
+ count++;
+ assertTrue(result.size() > 0);
+ }
+ return count;
}
- return count;
}
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java Thu Apr 17 00:48:37 2014
@@ -22,11 +22,17 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.StringBytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -39,7 +45,6 @@ public class TestHTableClientScanner {
final Log LOG = LogFactory.getLog(getClass());
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static final byte[] TABLE_NAME = Bytes.toBytes("TABLE");
private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
private static final int SLAVES = 3;
@@ -58,6 +63,7 @@ public class TestHTableClientScanner {
@Test
public void testScanner() throws IOException {
+ final StringBytes TABLE_NAME = new StringBytes("testScanner");
HTable table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 3,
Bytes.toBytes("bbb"), Bytes.toBytes("yyy"), 25);
@@ -66,4 +72,48 @@ public class TestHTableClientScanner {
int counted = HBaseTestingUtility.countRows(table, new Scan());
assertEquals("rowCount", rowCount, counted);
}
+
+ /**
+ * Testing parallel scanning with more threads than background threads.
+ */
+ @Test
+ public void testMoreThreads() throws Exception {
+ final int ROW_COUNT = 10000;
+ final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
+ final StringBytes TABLE_NAME = new StringBytes("testMoreThreads");
+
+ HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ table.setAutoFlush(false);
+ for (int i = 0; i < ROW_COUNT; i++) {
+ byte[] row = Bytes.toBytes("row-" + i);
+ Put put = new Put(row).add(FAMILY, row, row);
+ table.put(put);
+ }
+ table.flushCommits();
+
+ ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+ Future<?>[] futures = new Future<?>[THREAD_COUNT];
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ futures[i] = executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ HTable table = new HTableAsync(TEST_UTIL.getConfiguration(),
+ TABLE_NAME);
+ try (ResultScanner scanner = table.getScanner(new Scan())) {
+ for (Result result : scanner) {
+ Assert.assertTrue("result.size should > 0", result.size() > 0);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
}