You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/02/11 19:28:40 UTC

svn commit: r1567270 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/rest/client/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/client/

Author: liyin
Date: Tue Feb 11 18:28:40 2014
New Revision: 1567270

URL: http://svn.apache.org/r1567270
Log:
[HBASE-10502] ParallelScanner: a client utility to perform multiple scan request in parallel.

Author: liyintang

Summary:
ParallelScanner is a utility class for the HBase client to perform multiple scan requests in parallel. It requires all the scan requests having the same caching size for the simplicity purpose.

This class provides 3 very basic functionalities: {@link #initialize()}, {@link #next()},  {@link #close()}.
* The initialize function will Initialize all the ResultScanners by calling {@link HTable#getScanner(Scan)} in parallel for each scan request.
* The next function will call the corresponding {@link ResultScanner#next(int numRows)} from each scan request in parallel, and then return all the results together as a list.  Also, if result list is empty, it indicates there is no data left for all the scanners and the user can call {@link #close()} afterwards.
* The close function will close all the scanners and shutdown the thread pool.

Also, this diff includes a unit test, TestParallelScanner, to cover the functionalities of ParallelScanner.

Test Plan: Run through all the scan related unit tests and TestParallelScanner

Reviewers: manukranthk

Reviewed By: manukranthk

CC: hbase-dev@, daviddeng

Differential Revision: https://phabricator.fb.com/D1167268

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestParallelScanner.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1567270&r1=1567269&r2=1567270&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Feb 11 18:28:40 2014
@@ -584,6 +584,17 @@ public final class HConstants {
   public static final String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size";
 
   /**
+   * Parameter name for the number of threads for the ParallelScanner
+   */
+  public static final String HBASE_CLIENT_PARALLEL_SCANNER_THREAD =
+    "hbase.client.parallel.scanner.thread";
+
+  /**
+   * The default number of threads for the ParallelScanner
+   */
+  public static final int HBASE_CLIENT_PARALLEL_SCANNER_THREAD_DEFAULT = 100;
+
+  /**
    * Maximum number of bytes returned when calling a scanner's next method.
    * Note that when a single row is larger than this limit the row is still
    * returned completely.

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java?rev=1567270&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ParallelScanner.java Tue Feb 11 18:28:40 2014
@@ -0,0 +1,175 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * ParallelScanner is a utility class for the HBase client to perform multiple scan
+ * requests in parallel.
+ *
+ * ParallelScanner requires all the scan requests having the same caching size for the
+ * simplicity purpose. It provides 3 very basic functionalities: {@link #initialize()},
+ * {@link #next()},  {@link #close()}
+ *
+ */
+public class ParallelScanner {
+  public static final Log LOG = LogFactory.getLog(ParallelScanner.class);
+  private final List<Scan> scans;
+  private final HTable hTable;
+  private final int numRows;
+  private final ThreadPoolExecutor parallelScannerPool;
+  private List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
+
+  /**
+   * To construct a ParallelScanner
+   * @param table The common HTable instance for each scan request
+   * @param scans The list of scan requests
+   * @param numRows The number of rows that each scan request would fetch in one RPC call.
+   */
+  public ParallelScanner(HTable table, List<Scan> scans, int numRows) {
+    this.hTable = table;
+    this.scans = scans;
+    this.numRows = numRows;
+    int threads = table.getConfiguration().getInt(
+      HConstants.HBASE_CLIENT_PARALLEL_SCANNER_THREAD,
+      HConstants.HBASE_CLIENT_PARALLEL_SCANNER_THREAD_DEFAULT);
+
+    // TODO: if launching the thread pool for each ParallelScanner turns out to be a performance
+    // bottleneck, it can be optimized by sharing a common thread pool.
+    parallelScannerPool = new ThreadPoolExecutor(threads, threads,
+      60, TimeUnit.SECONDS,
+      new LinkedBlockingQueue<Runnable>(),
+      new DaemonThreadFactory("ParallelScanner-Thread-"));
+    parallelScannerPool.allowCoreThreadTimeOut(true);
+  }
+
+  /**
+   * Initialize all the ResultScanners by calling
+   * {@link HTable#getScanner(Scan)} in parallel for each scan request.
+   *
+   * @throws IOException if any of the getScanners throws out the exceptions
+   */
+  public void initialize() throws IOException {
+    Map<Scan, Future<ResultScanner>> results = new HashMap();
+
+    for (final Scan scan : scans) {
+      // setCaching for each scan
+      scan.setCaching(numRows);
+
+      results.put(scan,
+        parallelScannerPool.submit(new Callable<ResultScanner>() {
+          public ResultScanner call() throws IOException {
+            return hTable.getScanner(scan);
+          }
+        })
+      );
+    }
+
+    for (Map.Entry<Scan, Future<ResultScanner>> resultEntry : results.entrySet()) {
+      try {
+        resultScanners.add(resultEntry.getValue().get());
+        // TODO: switch to use multi-catch when compiling client jar with JDK7
+      } catch (InterruptedException e) {
+        throw new IOException("Could not get scanners for the scan: "
+          + resultEntry.getKey() + " due to " + e);
+      } catch (ExecutionException e) {
+        throw new IOException("Could not get scanners for the scan: "
+          + resultEntry.getKey() + " due to " + e);
+      }
+    }
+  }
+
+  /**
+   * After the user has called {@link #initialize()}, this function will call the
+   * corresponding {@link ResultScanner#next(int numRows)} from each scan request in parallel,
+   * and then return all the results together as a list.  Also, if result list is empty,
+   * it indicates there is no data left for all the scanners and the user can call
+   * {@link #close()} afterwards.
+   *
+   * @return a list of result, which comes from the return of each
+   * {@link ResultScanner#next(int  numRows)}
+   * @throws IOException
+   */
+  public List<Result> next() throws IOException{
+    if (resultScanners.isEmpty()) {
+      LOG.warn("There is no ResultScanner available for this ConcurrentScanner.");
+      return null;
+    }
+
+    List<Result> results = new ArrayList();
+    Map<ResultScanner, Future<Result[]>> context = new HashMap();
+    for (final ResultScanner scanner : resultScanners) {
+
+      if (scanner.isClosed()) {
+        continue;   // Skip the closed scanner
+      }
+
+      context.put(scanner,
+        parallelScannerPool.submit(new Callable<Result[]>() {
+          public Result[] call() throws IOException {
+            Result[] tmp = scanner.next(numRows);
+            if (tmp.length == 0) { // scanner.next() returns a NON-NULL result.
+              scanner.close(); // close the scanner as there is no data left.
+            }
+            return tmp;
+          }
+        })
+      );
+    }
+
+    for (Map.Entry<ResultScanner, Future<Result[]>> contextEntry : context.entrySet()) {
+      try {
+        Result[] result = contextEntry.getValue().get();
+        if (result.length != 0) {
+          results.addAll(Arrays.asList(result));
+        }
+      // TODO: switch to use multi-catch when compiling client jar with JDK7
+      } catch (InterruptedException e) {
+        throw new IOException("Could not get scanners for the scan: "
+          + contextEntry.getKey() + " due to " + e);
+      } catch (ExecutionException e) {
+        throw new IOException("Could not get scanners for the scan: "
+          + contextEntry.getKey() + " due to " + e);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Close all the scanners and shutdown the thread pool
+   */
+  public void close() {
+    if (resultScanners.isEmpty()) {
+      return;
+    }
+    for (final ResultScanner scanner : resultScanners) {
+      scanner.close();
+    }
+    this.parallelScannerPool.shutdownNow();
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java?rev=1567270&r1=1567269&r2=1567270&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java Tue Feb 11 18:28:40 2014
@@ -47,4 +47,9 @@ public interface ResultScanner extends C
    * Closes the scanner and releases any resources it has allocated
    */
   public void close();
+
+  /**
+   * @return true if the scanner is closed. Otherwise return false.
+   */
+  public boolean isClosed();
 }
\ No newline at end of file

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java?rev=1567270&r1=1567269&r2=1567270&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java Tue Feb 11 18:28:40 2014
@@ -20,11 +20,6 @@
 
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
@@ -32,6 +27,11 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+
 /**
  * This abstract class was designed in order to share code across ClientScanner
  * and ClientLocalScanner and unify the common code.
@@ -181,6 +181,11 @@ public abstract class ResultScannerImpl 
     closed = true;
   }
 
+  @Override
+  public boolean isClosed() {
+    return closed;
+  }
+
   protected abstract void closeCurrentScanner();
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java?rev=1567270&r1=1567269&r2=1567270&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java Tue Feb 11 18:28:40 2014
@@ -399,6 +399,7 @@ public class RemoteHTable implements HTa
   class Scanner implements ResultScanner {
 
     String uri;
+    boolean isClosed = false;
 
     public Scanner(Scan scan) throws IOException {
       ScannerModel model;
@@ -522,6 +523,12 @@ public class RemoteHTable implements HTa
       } catch (IOException e) {
         LOG.warn(StringUtils.stringifyException(e));
       }
+      isClosed = true;
+    }
+
+    @Override
+    public boolean isClosed() {
+      return isClosed;
     }
 
   }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1567270&r1=1567269&r2=1567270&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Tue Feb 11 18:28:40 2014
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1720,4 +1721,36 @@ REGION_LOOP:
   public BlockCache getBlockCache() {
     return cacheConf.getBlockCache();
   }
+
+  /**
+   * Assert the number of rows are expected for a given HTable
+   * @param t The HTable instance to check
+   * @param expected The expected number of rows
+   * @throws IOException
+   */
+  public static void assertRowCount(final HTable t, final int expected)
+    throws IOException {
+    assertEquals(expected, countRows(t, new Scan()));
+  }
+
+  /**
+   * Count the number of rows for an HTable instance based on the given the specific
+   * scan request.
+   *
+   * @param t The HTable instance to count the rows
+   * @param s The scan request fpr counting
+   * @return Count of rows in table for a given scan request
+   * @throws IOException
+   */
+  public static int countRows(final HTable t, final Scan s)
+    throws IOException {
+    // Assert all rows in table.
+    ResultScanner scanner = t.getScanner(s);
+    int count = 0;
+    for (Result result: scanner) {
+      count++;
+      assertTrue(result.size() > 0);
+    }
+    return count;
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner.java?rev=1567270&r1=1567269&r2=1567270&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner.java Tue Feb 11 18:28:40 2014
@@ -87,7 +87,7 @@ public class TestClientLocalScanner {
     TEST_UTIL.loadTable(t, FAMILY2);
     t.flushCommits();
     TEST_UTIL.flush(name);
-    assertRowCount(t, rowCount);
+    TEST_UTIL.assertRowCount(t, rowCount);
     Scan scan = getScan(100, 100, true, FAMILY);
     assertTrue(compareScanners(tmpTable.getScanner(scan),
         t.getLocalScanner(scan)));
@@ -179,10 +179,10 @@ public class TestClientLocalScanner {
     t.flushCommits();
     TEST_UTIL.flush(name);
 
-    assertRowCount(t, rowCount);
+    TEST_UTIL.assertRowCount(t, rowCount);
     // Split the table.  Should split on a reasonable key; 'lqj'
     Map<HRegionInfo, HServerAddress> regions  = splitTable(t);
-    assertRowCount(t, rowCount);
+    TEST_UTIL.assertRowCount(t, rowCount);
     // Get end key of first region.
     byte [] endKey = regions.keySet().iterator().next().getEndKey();
     // Count rows with a filter that stops us before passed 'endKey'.
@@ -257,7 +257,7 @@ public class TestClientLocalScanner {
     int rowCount = TEST_UTIL.loadTable(t, FAMILY);
     t.flushCommits();
     TEST_UTIL.flush(tableName);
-    assertRowCount(t, rowCount);
+    TEST_UTIL.assertRowCount(t, rowCount);
 
     Scan scan = getScan(100, 100, true, FAMILY);
     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
@@ -376,30 +376,6 @@ public class TestClientLocalScanner {
     }
   }
 
-  private void assertRowCount(final HTable t, final int expected)
-    throws IOException {
-    assertEquals(expected, countRows(t, new Scan()));
-  }
-
-  /**
-   * @param t
-   * @param s
-   * @return Count of rows in table.
-   * @throws IOException
-   */
-  private int countRows(final HTable t, final Scan s)
-  throws IOException {
-    // Assert all rows in table.
-    ResultScanner scanner = t.getScanner(s);
-    int count = 0;
-    for (Result result: scanner) {
-      count++;
-      assertTrue(result.size() > 0);
-      // LOG.info("Count=" + count + ", row=" + Bytes.toString(result.getRow()));
-    }
-    return count;
-  }
-
   /**
    * @param t
    * @param s

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestParallelScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestParallelScanner.java?rev=1567270&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestParallelScanner.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestParallelScanner.java Tue Feb 11 18:28:40 2014
@@ -0,0 +1,95 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestParallelScanner {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+  private static byte [] FAMILY = Bytes.toBytes("FAMILY");
+  private static int SLAVES = 3;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testParallelScanner() throws IOException {
+    // Create and load the table
+    byte [] name = Bytes.toBytes("testParallelScanner");
+    HTable table = TEST_UTIL.createTable(name, new byte[][] {FAMILY});
+    final int regionCnt = TEST_UTIL.createMultiRegions(table, FAMILY);
+    TEST_UTIL.waitUntilAllRegionsAssigned(regionCnt);
+    final int rowCount = TEST_UTIL.loadTable(table, FAMILY);
+    table.flushCommits();
+    TEST_UTIL.flush(name);
+
+    Scan[] rowsPerRegionScanner = new Scan[regionCnt];
+    Pair<byte[][],byte[][]> startAndEndKeys =  table.getStartEndKeys();
+    assertEquals(regionCnt, startAndEndKeys.getFirst().length);
+    assertEquals(regionCnt, startAndEndKeys.getSecond().length);
+
+    // Get the rows per region in the sequential order.
+    int totalRowsScannedInSequential = 0;
+    for (int i = 0; i < regionCnt; i++) {
+      Scan s = new Scan();
+      s.setStartRow(startAndEndKeys.getFirst()[i]);
+      s.setStopRow(startAndEndKeys.getSecond()[i]);
+      s.addFamily(FAMILY);
+      rowsPerRegionScanner[i] = s;
+      totalRowsScannedInSequential += TEST_UTIL.countRows(table, s);
+    }
+    assertEquals(rowCount, totalRowsScannedInSequential);
+
+    // Construct a ParallelScanner
+    ParallelScanner parallelScanner =
+      new ParallelScanner(table, Arrays.asList(rowsPerRegionScanner), 10);
+
+    int totalRowScannedInParallel = 0;
+    parallelScanner.initialize();
+    List<Result> results = null;
+    while(!(results = parallelScanner.next()).isEmpty()) {
+      totalRowScannedInParallel += results.size();
+    }
+    parallelScanner.close();
+    assertEquals(rowCount, totalRowScannedInParallel);
+  }
+}