You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/17 02:48:37 UTC

svn commit: r1588111 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/client/

Author: liyin
Date: Thu Apr 17 00:48:37 2014
New Revision: 1588111

URL: http://svn.apache.org/r1588111
Log:
[HBASE-10709] Fix the bug of rejecting exception in HTableClientScanner when too many scanner running at the same time.

Author: daviddeng

Summary:
`SynchronousQueue` is replaced with `LinkedBlockingQueue`
Background thread is no longer blocked waiting for main thread to consume the data.

Test Plan: `TestHTableClientScanner`

Reviewers: liyintang, manukranthk, elliott, ehwang

Reviewed By: ehwang

CC: hbase-eng@, elliott, ehwang

Differential Revision: https://phabricator.fb.com/D1271322

Task ID: 4073520

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java Thu Apr 17 00:48:37 2014
@@ -24,9 +24,9 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,13 +36,14 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 
 /**
  * Implements the scanner interface for the HBase client.
  * If there are multiple regions in a table, this scanner will iterate
  * through them all.
  */
-public class HTableClientScanner implements ResultScanner, Runnable {
+public class HTableClientScanner implements ResultScanner {
   private static final Log LOG = LogFactory.getLog(HTableClientScanner.class);
   // End of Scanning
   private static final Result[] EOS = new Result[0];
@@ -50,14 +51,10 @@ public class HTableClientScanner impleme
   private static final int MAX_THREADS_IN_POOL = Runtime.getRuntime()
       .availableProcessors();
 
-  private static final ExecutorService executor = new ThreadPoolExecutor(1,
-      MAX_THREADS_IN_POOL, 60L, TimeUnit.SECONDS,
-      new SynchronousQueue<Runnable>());
-
-  // HEADSUP: The scan internal start row can change as we move through table.
-  protected final Scan scan;
-  // The number of prefetched and cached results
-  private final int caching;
+  private static final ExecutorService executor = Executors.newFixedThreadPool(
+      MAX_THREADS_IN_POOL, new DaemonThreadFactory(
+          "HTableClientScanner.Fetching."));
+
   // Temporary results list in main thread, may be null
   private Result[] currentResults;
   // The position of next unfetched results in currentResults if it is
@@ -65,37 +62,44 @@ public class HTableClientScanner impleme
   private int currentPos;
   // Whether this client has closed.
   private boolean closed;
-  /**
-   * The queue transferring fetched Result[] to main thread.
-   * When queue.take() returns an EOS, scanning ends.
-   */
+  // The queue transferring fetched Result[] to main thread.
+  // When queue.take() returns an EOS, scanning ends.
   private final ArrayBlockingQueue<Result[]> queue;
+  // A place storing Result[] in case the queue is full. It is set only at
+  // fetcher thread, will be cleared in main thread.
+  private final AtomicReference<Result[]> justFetched = new AtomicReference<>();
+  // Contains exception thrown in fetcher thread.
+  private final AtomicReference<Throwable> exception = new AtomicReference<>();
   // The variable informing fetching thread to stop
-  private volatile boolean closing;
-  // Contains the exception caught in fetch thread.
-  private volatile Throwable exception;
+  private final AtomicBoolean closing = new AtomicBoolean(false);
 
-  private final HTable table;
+  private final Fetcher fetcher;
 
   /**
    * Constructor.
+   *
+   * @param scan The scan internal start row can change as we move through
+   *          table.
    */
-  public HTableClientScanner(Scan scan, HTable table) {
-    this.scan = scan;
-    this.table = table;
+  public HTableClientScanner(Scan scan, HTable table) throws IOException {
     this.queue = new ArrayBlockingQueue<>(table.getConfiguration().getInt(
         HConstants.HBASE_CLIENT_SCANNER_QUEUE_LENGTH,
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH));
 
+    int caching;
     if (scan.getCaching() > 0) {
-      this.caching = scan.getCaching();
+      caching = scan.getCaching();
     } else {
-      this.caching = table.getScannerCaching();
+      caching = table.getScannerCaching();
     }
+
+    fetcher = new Fetcher(table, scan, caching, queue, justFetched, exception,
+        closing);
   }
 
   HTableClientScanner initialize() {
-    executor.execute(this);
+    executor.execute(fetcher);
+
     return this;
   }
 
@@ -117,7 +121,7 @@ public class HTableClientScanner impleme
   }
 
   /**
-   * Fetches results from queue to currentResults if it is not null.
+   * Fetches results from queue to currentResults if it is null.
    *
    * @return true if more results available, false if end of scanning
    */
@@ -132,27 +136,39 @@ public class HTableClientScanner impleme
 
     try {
       currentResults = queue.take();
+
       if (currentResults.length == 0) {
         // End of scanning
         closed = true;
         currentResults = null;
 
-        if (exception != null) {
-
+        Throwable e = this.exception.get();
+        if (e != null) {
           // Failure of scanning
-          throwIOException(exception);
+          throwIOException(e);
         }
 
         return false;
       }
 
-      // Results fetched
-      currentPos = 0;
-      return true;
+      Result[] jf = justFetched.getAndSet(null);
+      if (jf != null) {
+        // Something is put justFetched because the queue is full when those
+        // results are fetched. The fetching task should not be running now.
+        queue.add(jf);
+        if (jf.length > 0) {
+          // We may have more results
+          executor.execute(fetcher);
+        }
+      }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
     }
+
+    // Results fetched
+    currentPos = 0;
+    return true;
   }
 
   @Override
@@ -212,7 +228,7 @@ public class HTableClientScanner impleme
     if (this.closed) {
       return;
     }
-    this.closing = true;
+    this.closing.set(true);
     try {
       while (fetchFromQueue()) {
         // skip all results
@@ -224,137 +240,219 @@ public class HTableClientScanner impleme
     }
   }
 
-  private Result[] call(ScannerCallable callable) throws IOException {
-    return table.getConnectionAndResetOperationContext()
-        .getRegionServerWithRetries(callable);
+  @Override
+  public boolean isClosed() {
+    return closed;
   }
 
-  // Returns a ScannerCallable with a start key
-  private ScannerCallable getScannerCallable(byte[] startKey) {
-    scan.setStartRow(startKey);
-    ScannerCallable s = new ScannerCallable(
-        table.getConnectionAndResetOperationContext(),
-        table.getTableNameStringBytes(),
-        scan, table.getOptions());
-    s.setCaching(caching);
-    return s;
-  }
+  private static class Fetcher implements Runnable {
+    // The startKey for opening a scanner.
+    private byte[] startKey;
+    // The callable for scanning
+    private ScannerCallable callable;
+    // Current scanning region info.
+    private HRegionInfo currentRegion;
+    // Timestamp of last successful scan
+    private long lastSuccNextTs;
+    // The last result returned.
+    private Result lastRes = null;
+
+    private final HTable table;
+    private final Scan scan;
+    private final int caching;
+
+    private final ArrayBlockingQueue<Result[]> queue;
+    private final AtomicReference<Result[]> justFetched;
+    private final AtomicReference<Throwable> exception;
+    private final AtomicBoolean closing;
+
+    public Fetcher(HTable table, Scan scan, int caching,
+        ArrayBlockingQueue<Result[]> queue,
+        AtomicReference<Result[]> justFetched,
+        AtomicReference<Throwable> exception, AtomicBoolean closing) {
+      this.table = table;
+      this.scan = scan;
+      this.caching = caching;
+
+      this.queue = queue;
+      this.justFetched = justFetched;
+      this.exception = exception;
+      this.closing = closing;
+
+      // Initialize startKey
+      startKey = scan.getStartRow();
+      if (startKey == null) {
+        // In case startKey == null, set it to zero-length byte array since
+        // null means end-of-scan.
+        startKey = HConstants.EMPTY_BYTE_ARRAY;
+      }
+    }
 
-  // Closes a callable silently.
-  private void closeScanner(ScannerCallable callable) {
-    callable.setClose();
-    try {
-      call(callable);
-    } catch (IOException e) {
-      // We used to catch this error, interpret, and rethrow. However, we
-      // have since decided that it's not nice for a scanner's close to
-      // throw exceptions. Chances are it was just an UnknownScanner
-      // exception due to lease time out.
-      LOG.error("Exception caught during closeScanner", e);
+    private Result[] call(ScannerCallable callable) throws IOException {
+      return table.getConnectionAndResetOperationContext()
+          .getRegionServerWithRetries(callable);
+    }
+
+    // Returns a ScannerCallable with a start key
+    private ScannerCallable getScannerCallable(byte[] startKey) {
+      scan.setStartRow(startKey);
+      ScannerCallable s =
+          new ScannerCallable(table.getConnectionAndResetOperationContext(),
+              table.getTableNameStringBytes(), scan, table.getOptions());
+      s.setCaching(caching);
+      return s;
+    }
+
+    // Closes a callable silently.
+    private void closeScanner(ScannerCallable callable) {
+      callable.setClose();
+      try {
+        call(callable);
+      } catch (IOException e) {
+        // We used to catch this error, interpret, and rethrow. However, we
+        // have since decided that it's not nice for a scanner's close to
+        // throw exceptions. Chances are it was just an UnknownScanner
+        // exception due to lease time out.
+        LOG.error("Exception caught during closeScanner", e);
+      }
     }
-  }
 
-  /**
-   * Scans a region server, results are put to queue.
-   *
-   * @return New start key if scanning does not end, null otherwise
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private byte[] scanRegionServer(byte[] startKey) throws IOException,
-      InterruptedException {
-    // Open a scanner
-    ScannerCallable callable = getScannerCallable(startKey);
-    // openScanner
-    call(callable);
-    HRegionInfo currentRegion = callable.getHRegionInfo();
+    /**
+     * Keep scanning on a region server. If we can get some results, they are
+     * returned, otherwise a null is returned.
+     *
+     * startKey is changed if necessary. At the end of scanning, it is set to
+     * null.
+     *
+     * @return an non-empty array of Result if we get some data. null otherwise.
+     */
+    private Result[] scanRegionServer() throws IOException,
+        InterruptedException {
+      if (callable == null) {
+        // Open a scanner
+        callable = getScannerCallable(startKey);
+        // openScanner
+        call(callable);
+        currentRegion = callable.getHRegionInfo();
 
-    Result lastRes = null;
-    long lastSuccNextTs = System.currentTimeMillis();
-    try {
-      while (!closing) {
+        lastRes = null;
+        lastSuccNextTs = System.currentTimeMillis();
+      }
+
+      boolean keepCallable = false;
+
+      try {
         Result[] values = call(callable);
         if (values == null) {
           // End of scanning
-          return null;
+          startKey = null;
         } else if (values.length == 0) {
           // End of region
-          return currentRegion.getEndKey();
-        }
+          startKey = currentRegion.getEndKey();
+          // Mark startKey as null for last region.
+          if (startKey != null && startKey.length == 0) {
+            startKey = null;
+          }
+        } else {
+          // We got some results
+          lastRes = values[values.length - 1];
+          lastSuccNextTs = System.currentTimeMillis();
+
+          // In this case, we keep callable
+          keepCallable = true;
 
-        lastRes = values[values.length - 1];
-        if (!closing) {
-          queue.put(values);
+          return values;
         }
-        lastSuccNextTs = System.currentTimeMillis();
-      }
-    } catch (DoNotRetryIOException e) {
-      boolean canRetry = false;
-      if (e instanceof UnknownScannerException) {
-        long timeoutTs = lastSuccNextTs + table.scannerTimeout;
-        long now = System.currentTimeMillis();
-        if (now > timeoutTs) {
-          // Scanner timeout
-          long elapsed = now - lastSuccNextTs;
-          ScannerTimeoutException ex = new ScannerTimeoutException(elapsed
-              + "ms pased since the last invocation, "
-              + "timetout is current set to " + table.scannerTimeout);
-          ex.initCause(e);
-          throw ex;
+      } catch (DoNotRetryIOException e) {
+        boolean canRetry = false;
+        if (e instanceof UnknownScannerException) {
+          // The region server may restarted.
+          long timeoutTs = lastSuccNextTs + table.scannerTimeout;
+          long now = System.currentTimeMillis();
+          if (now > timeoutTs) {
+            // Scanner timeout
+            long elapsed = now - lastSuccNextTs;
+            ScannerTimeoutException ex =
+                new ScannerTimeoutException(elapsed
+                    + "ms pased since the last invocation, "
+                    + "timetout is current set to " + table.scannerTimeout);
+            ex.initCause(e);
+            throw ex;
+          }
+
+          canRetry = true; // scannerTimeout
+        } else {
+          Throwable cause = e.getCause();
+          if (cause != null && cause instanceof NotServingRegionException) {
+            canRetry = true;
+          }
         }
 
-        canRetry = true; // scannerTimeout
-      } else {
-        Throwable cause = e.getCause();
-        if (cause != null && cause instanceof NotServingRegionException) {
-          canRetry = true;
+        if (!canRetry) {
+          // Cannot retry, simply throw it out
+          throw e;
         }
-      }
-
-      if (!canRetry) {
-        // Cannot retry, simply throw it out
-        throw e;
-      }
 
-      if (lastRes != null) {
-        return Bytes.nextOf(lastRes.getRow());
+        if (lastRes != null) {
+          // Skip lastRes since it has been returned.
+          startKey = Bytes.nextOf(lastRes.getRow());
+        }
+      } finally {
+        if (!keepCallable) {
+          closeScanner(callable);
+          callable = null;
+        }
       }
 
-      return startKey;
-    } finally {
-      closeScanner(callable);
+      return null;
     }
-    // Only reach here when closing is true
-    return null;
-  }
 
-  @Override
-  public void run() {
-    try {
-      byte[] startKey = this.scan.getStartRow();
-      while (!closing) {
-        startKey = scanRegionServer(startKey);
-        if (startKey == null || startKey.length == 0) {
-          break;
+    /**
+     * Puts results in queue or justFetched.
+     *
+     * @return whether we should continue fetching in this run.
+     */
+    private boolean putResults(Result[] results) {
+      if (!queue.offer(results)) {
+        // queue is full, put results in justFetched
+        justFetched.set(results);
+
+        if (queue.isEmpty()) {
+          // It's possible the queue is empty before justFetched is set
+          // and the main thread is blocking on queue.Take().
+          // We try move results in justFetched to queue here.
+          Result[] js = justFetched.getAndSet(null);
+          if (js != null) {
+            queue.add(js);
+            return true;
+          }
+          // If js == null, it means the main thread moved justFetched to
+          // queue and arranged a new run.
         }
+        // Then quit from this run. New run is submitted when some results
+        // are taken out of the queue
+        return false;
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      exception = e;
-    } catch (Throwable e) {
-      exception = e;
+      return true;
     }
 
-    try {
-      queue.put(EOS);
-    } catch (InterruptedException e) {
-      LOG.info("Fetching thread interrupted", e);
-      Thread.currentThread().interrupt();
+    @Override
+    public void run() {
+      try {
+        while (!closing.get() && startKey != null) {
+          Result[] results = scanRegionServer();
+
+          if (results != null) {
+            if (!putResults(results)) {
+              return;
+            }
+          }
+        }
+      } catch (Throwable e) {
+        exception.set(e);
+      }
+      // We only get here scanning is over or aborted with exception
+      putResults(EOS);
     }
   }
-
-  @Override
-  public boolean isClosed() {
-    return closed;
-  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Thu Apr 17 00:48:37 2014
@@ -113,7 +113,8 @@ public interface HTableInterface {
    * Returns a scanner on the current table as specified by the {@link Scan}
    * object.
    *
-   * @param scan A configured {@link Scan} object.
+   * @param scan A configured {@link Scan} object. NOTE scan may be kept and
+   *             changed inside. The caller should not reuse it.
    * @return A scanner.
    * @throws IOException if a remote or network exception occurs.
    * @since 0.20.0

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Apr 17 00:48:37 2014
@@ -1951,12 +1951,13 @@ REGION_LOOP:
   public static int countRows(final HTable t, final Scan s)
     throws IOException {
     // Assert all rows in table.
-    ResultScanner scanner = t.getScanner(s);
-    int count = 0;
-    for (Result result: scanner) {
-      count++;
-      assertTrue(result.size() > 0);
+    try (ResultScanner scanner = t.getScanner(s)) {
+      int count = 0;
+      for (Result result : scanner) {
+        count++;
+        assertTrue(result.size() > 0);
+      }
+      return count;
     }
-    return count;
   }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java?rev=1588111&r1=1588110&r2=1588111&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestHTableClientScanner.java Thu Apr 17 00:48:37 2014
@@ -22,11 +22,17 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.StringBytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -39,7 +45,6 @@ public class TestHTableClientScanner {
   final Log LOG = LogFactory.getLog(getClass());
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static final byte[] TABLE_NAME = Bytes.toBytes("TABLE");
   private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
   private static final int SLAVES = 3;
 
@@ -58,6 +63,7 @@ public class TestHTableClientScanner {
 
   @Test
   public void testScanner() throws IOException {
+    final StringBytes TABLE_NAME = new StringBytes("testScanner");
     HTable table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 3,
         Bytes.toBytes("bbb"), Bytes.toBytes("yyy"), 25);
 
@@ -66,4 +72,48 @@ public class TestHTableClientScanner {
     int counted = HBaseTestingUtility.countRows(table, new Scan());
     assertEquals("rowCount", rowCount, counted);
   }
+
+  /**
+   * Testing parallel scanning with more threads than background threads.
+   */
+  @Test
+  public void testMoreThreads() throws Exception {
+    final int ROW_COUNT = 10000;
+    final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1;
+    final StringBytes TABLE_NAME = new StringBytes("testMoreThreads");
+
+    HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    table.setAutoFlush(false);
+    for (int i = 0; i < ROW_COUNT; i++) {
+      byte[] row = Bytes.toBytes("row-" + i);
+      Put put = new Put(row).add(FAMILY, row, row);
+      table.put(put);
+    }
+    table.flushCommits();
+
+    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+    Future<?>[] futures = new Future<?>[THREAD_COUNT];
+    for (int i = 0; i < THREAD_COUNT; i++) {
+      futures[i] = executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            HTable table = new HTableAsync(TEST_UTIL.getConfiguration(),
+                TABLE_NAME);
+            try (ResultScanner scanner = table.getScanner(new Scan())) {
+              for (Result result : scanner) {
+                Assert.assertTrue("result.size should > 0", result.size() > 0);
+              }
+            }
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    for (Future<?> future : futures) {
+      future.get();
+    }
+  }
 }