You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/14 02:33:19 UTC

svn commit: r1594425 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client: ParallelScanner.java ResultScanner.java

Author: liyin
Date: Wed May 14 00:33:19 2014
New Revision: 1594425

URL: http://svn.apache.org/r1594425
Log:
[HBASE-10502] Simply ParallelScanner

Author: daviddeng

Summary: HTableClientScanner now fetches data on the background, `ParallelScanner` can be simplified.

Test Plan: `TestParallelScanner`

Reviewers: liyintang, manukranthk, elliott

Reviewed By: elliott

CC: hbase-eng@, elliott

Differential Revision: https://phabricator.fb.com/D1312051

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java?rev=1594425&r1=1594424&r2=1594425&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java Wed May 14 00:33:19 2014
@@ -21,21 +21,11 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 
 /**
  * ParallelScanner is a utility class for the HBase client to perform multiple scan
@@ -51,7 +41,6 @@ public class ParallelScanner {
   private final List<Scan> scans;
   private final HTable hTable;
   private final int numRows;
-  private final ThreadPoolExecutor parallelScannerPool;
   private List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
 
   /**
@@ -64,54 +53,19 @@ public class ParallelScanner {
     this.hTable = table;
     this.scans = scans;
     this.numRows = numRows;
-    int threads = table.getConfiguration().getInt(
-      HConstants.HBASE_CLIENT_PARALLEL_SCANNER_THREAD,
-      HConstants.HBASE_CLIENT_PARALLEL_SCANNER_THREAD_DEFAULT);
-
-    // TODO: if launching the thread pool for each ParallelScanner turns out to be a performance
-    // bottleneck, it can be optimized by sharing a common thread pool.
-    parallelScannerPool = new ThreadPoolExecutor(threads, threads,
-      60, TimeUnit.SECONDS,
-      new LinkedBlockingQueue<Runnable>(),
-      new DaemonThreadFactory("ParallelScanner-Thread-"));
-    parallelScannerPool.allowCoreThreadTimeOut(true);
   }
 
   /**
-   * Initialize all the ResultScanners by calling
+   * Initializes all the ResultScanners by calling
    * {@link HTable#getScanner(Scan)} in parallel for each scan request.
    *
    * @throws IOException if any of the getScanners throws out the exceptions
    */
   public void initialize() throws IOException {
-    Map<Scan, Future<ResultScanner>> results = new HashMap<Scan, Future<ResultScanner>>();
-
     for (final Scan scan : scans) {
       // setCaching for each scan
       scan.setCaching(numRows);
-
-      results.put(scan,
-        parallelScannerPool.submit(new Callable<ResultScanner>() {
-          @Override
-          public ResultScanner call() throws IOException {
-            return hTable.getScanner(scan);
-          }
-        })
-      );
-    }
-
-    for (Map.Entry<Scan, Future<ResultScanner>> resultEntry : results.entrySet()) {
-      try {
-        resultScanners.add(resultEntry.getValue().get());
-        // TODO: switch to use multi-catch when compiling client jar with JDK7
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Could not get scanners for the scan: "
-          + resultEntry.getKey() + " due to " + e);
-      } catch (ExecutionException e) {
-        throw new IOException("Could not get scanners for the scan: "
-          + resultEntry.getKey() + " due to " + e);
-      }
+      resultScanners.add(hTable.getScanner(scan));
     }
   }
 
@@ -132,57 +86,28 @@ public class ParallelScanner {
       return null;
     }
 
-    List<Result> results = new ArrayList<Result>();
-    Map<ResultScanner, Future<Result[]>> context = new HashMap<ResultScanner, Future<Result[]>>();
+    ArrayList<Result> results = new ArrayList<>();
     for (final ResultScanner scanner : resultScanners) {
-
       if (scanner.isClosed()) {
         continue;   // Skip the closed scanner
       }
 
-      context.put(scanner,
-        parallelScannerPool.submit(new Callable<Result[]>() {
-          @Override
-          public Result[] call() throws IOException {
-            Result[] tmp = scanner.next(numRows);
-          if (tmp == null || tmp.length == 0) { // scanner.next() returns a
-                                                // NON-NULL result.
-              scanner.close(); // close the scanner as there is no data left.
-            }
-            return tmp;
-          }
-        })
-      );
-    }
-
-    for (Map.Entry<ResultScanner, Future<Result[]>> contextEntry : context.entrySet()) {
-      try {
-        Result[] result = contextEntry.getValue().get();
-        if (result != null && result.length != 0) {
-          results.addAll(Arrays.asList(result));
-        }
-      // TODO: switch to use multi-catch when compiling client jar with JDK7
-      } catch (InterruptedException e) {
-        throw new IOException("Could not get next value for the scan: "
-          + contextEntry.getKey() + " due to " + e);
-      } catch (ExecutionException e) {
-        throw new IOException("Could not get next value for the scan: "
-          + contextEntry.getKey() + " due to " + e);
+      Result[] tmp = scanner.next(numRows);
+      if (tmp != null && tmp.length > 0) {
+        results.ensureCapacity(results.size() + tmp.length);
+        Collections.addAll(results, tmp);
       }
     }
+
     return results;
   }
 
   /**
-   * Close all the scanners and shutdown the thread pool
+   * Closes all the scanners and shutdown the thread pool
    */
   public void close() {
-    if (resultScanners.isEmpty()) {
-      return;
-    }
     for (final ResultScanner scanner : resultScanners) {
       scanner.close();
     }
-    this.parallelScannerPool.shutdownNow();
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java?rev=1594425&r1=1594424&r2=1594425&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java Wed May 14 00:33:19 2014
@@ -29,20 +29,22 @@ import java.io.IOException;
 public interface ResultScanner extends Closeable, Iterable<Result> {
 
   /**
-   * Grab the next row's worth of values. The scanner will return a Result.
+   * Grabs the next row's worth of values. The scanner will return a Result.
    * The caller may call this method even if the scanner is closed.
    *
    * @return Result object if there is another row, null if the scanner is
    *         exhausted.
-   * @throws IOException
-   *           e
    */
   public Result next() throws IOException;
 
   /**
-   * @param nbRows number of rows to return
+   * Returns up to nbRows of the results. The implementation should return at
+   * least one result unless the scanner is exhausted or closed.
+   *
+   * If no results are available, a null or empty array is returned.
+   *
+   * @param nbRows the maximum number of rows to return
    * @return Between zero and <param>nbRows</param> Results
-   * @throws IOException e
    */
   public Result[] next(int nbRows) throws IOException;