You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/14 02:33:19 UTC
svn commit: r1594425 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client:
ParallelScanner.java ResultScanner.java
Author: liyin
Date: Wed May 14 00:33:19 2014
New Revision: 1594425
URL: http://svn.apache.org/r1594425
Log:
[HBASE-10502] Simply ParallelScanner
Author: daviddeng
Summary: HTableClientScanner now fetches data on the background, `ParallelScanner` can be simplified.
Test Plan: `TestParallelScanner`
Reviewers: liyintang, manukranthk, elliott
Reviewed By: elliott
CC: hbase-eng@, elliott
Differential Revision: https://phabricator.fb.com/D1312051
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java?rev=1594425&r1=1594424&r2=1594425&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java Wed May 14 00:33:19 2014
@@ -21,21 +21,11 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.DaemonThreadFactory;
/**
* ParallelScanner is a utility class for the HBase client to perform multiple scan
@@ -51,7 +41,6 @@ public class ParallelScanner {
private final List<Scan> scans;
private final HTable hTable;
private final int numRows;
- private final ThreadPoolExecutor parallelScannerPool;
private List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
/**
@@ -64,54 +53,19 @@ public class ParallelScanner {
this.hTable = table;
this.scans = scans;
this.numRows = numRows;
- int threads = table.getConfiguration().getInt(
- HConstants.HBASE_CLIENT_PARALLEL_SCANNER_THREAD,
- HConstants.HBASE_CLIENT_PARALLEL_SCANNER_THREAD_DEFAULT);
-
- // TODO: if launching the thread pool for each ParallelScanner turns out to be a performance
- // bottleneck, it can be optimized by sharing a common thread pool.
- parallelScannerPool = new ThreadPoolExecutor(threads, threads,
- 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new DaemonThreadFactory("ParallelScanner-Thread-"));
- parallelScannerPool.allowCoreThreadTimeOut(true);
}
/**
- * Initialize all the ResultScanners by calling
+ * Initializes all the ResultScanners by calling
* {@link HTable#getScanner(Scan)} in parallel for each scan request.
*
* @throws IOException if any of the getScanners throws out the exceptions
*/
public void initialize() throws IOException {
- Map<Scan, Future<ResultScanner>> results = new HashMap<Scan, Future<ResultScanner>>();
-
for (final Scan scan : scans) {
// setCaching for each scan
scan.setCaching(numRows);
-
- results.put(scan,
- parallelScannerPool.submit(new Callable<ResultScanner>() {
- @Override
- public ResultScanner call() throws IOException {
- return hTable.getScanner(scan);
- }
- })
- );
- }
-
- for (Map.Entry<Scan, Future<ResultScanner>> resultEntry : results.entrySet()) {
- try {
- resultScanners.add(resultEntry.getValue().get());
- // TODO: switch to use multi-catch when compiling client jar with JDK7
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Could not get scanners for the scan: "
- + resultEntry.getKey() + " due to " + e);
- } catch (ExecutionException e) {
- throw new IOException("Could not get scanners for the scan: "
- + resultEntry.getKey() + " due to " + e);
- }
+ resultScanners.add(hTable.getScanner(scan));
}
}
@@ -132,57 +86,28 @@ public class ParallelScanner {
return null;
}
- List<Result> results = new ArrayList<Result>();
- Map<ResultScanner, Future<Result[]>> context = new HashMap<ResultScanner, Future<Result[]>>();
+ ArrayList<Result> results = new ArrayList<>();
for (final ResultScanner scanner : resultScanners) {
-
if (scanner.isClosed()) {
continue; // Skip the closed scanner
}
- context.put(scanner,
- parallelScannerPool.submit(new Callable<Result[]>() {
- @Override
- public Result[] call() throws IOException {
- Result[] tmp = scanner.next(numRows);
- if (tmp == null || tmp.length == 0) { // scanner.next() returns a
- // NON-NULL result.
- scanner.close(); // close the scanner as there is no data left.
- }
- return tmp;
- }
- })
- );
- }
-
- for (Map.Entry<ResultScanner, Future<Result[]>> contextEntry : context.entrySet()) {
- try {
- Result[] result = contextEntry.getValue().get();
- if (result != null && result.length != 0) {
- results.addAll(Arrays.asList(result));
- }
- // TODO: switch to use multi-catch when compiling client jar with JDK7
- } catch (InterruptedException e) {
- throw new IOException("Could not get next value for the scan: "
- + contextEntry.getKey() + " due to " + e);
- } catch (ExecutionException e) {
- throw new IOException("Could not get next value for the scan: "
- + contextEntry.getKey() + " due to " + e);
+ Result[] tmp = scanner.next(numRows);
+ if (tmp != null && tmp.length > 0) {
+ results.ensureCapacity(results.size() + tmp.length);
+ Collections.addAll(results, tmp);
}
}
+
return results;
}
/**
- * Close all the scanners and shutdown the thread pool
+ * Closes all the scanners and shutdown the thread pool
*/
public void close() {
- if (resultScanners.isEmpty()) {
- return;
- }
for (final ResultScanner scanner : resultScanners) {
scanner.close();
}
- this.parallelScannerPool.shutdownNow();
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java?rev=1594425&r1=1594424&r2=1594425&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java Wed May 14 00:33:19 2014
@@ -29,20 +29,22 @@ import java.io.IOException;
public interface ResultScanner extends Closeable, Iterable<Result> {
/**
- * Grab the next row's worth of values. The scanner will return a Result.
+ * Grabs the next row's worth of values. The scanner will return a Result.
* The caller may call this method even if the scanner is closed.
*
* @return Result object if there is another row, null if the scanner is
* exhausted.
- * @throws IOException
- * e
*/
public Result next() throws IOException;
/**
- * @param nbRows number of rows to return
+ * Returns up to nbRows of the results. The implementation should return at
+ * least one result unless the scanner is exhausted or closed.
+ *
+ * If no results are available, a null or empty array is returned.
+ *
+ * @param nbRows the maximum number of rows to return
* @return Between zero and <param>nbRows</param> Results
- * @throws IOException e
*/
public Result[] next(int nbRows) throws IOException;