You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2014/05/22 19:02:42 UTC

git commit: HBASE-11214. Fixes for scans on a replicated table

Repository: hbase
Updated Branches:
  refs/heads/hbase-10070 d02bb538d -> 97b7df274


HBASE-11214. Fixes for scans on a replicated table


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97b7df27
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97b7df27
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97b7df27

Branch: refs/heads/hbase-10070
Commit: 97b7df274c27ae1075d0782ad31cf284362eaf48
Parents: d02bb53
Author: Devaraj Das <dd...@Devaraj-Dass-MacBook-Pro-2.local>
Authored: Thu May 22 10:01:50 2014 -0700
Committer: Devaraj Das <dd...@Devaraj-Dass-MacBook-Pro-2.local>
Committed: Thu May 22 10:01:50 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      | 71 +-------------------
 .../hadoop/hbase/client/ClientSmallScanner.java | 57 ++--------------
 .../org/apache/hadoop/hbase/client/HTable.java  |  2 +-
 .../hbase/client/ReversedClientScanner.java     | 19 +-----
 .../client/ScannerCallableWithReplicas.java     | 12 ++--
 5 files changed, 15 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 3bdce36..14a5608 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -48,8 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  * If there are multiple regions in a table, this scanner will iterate
  * through them all.
  */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceAudience.Private
 public class ClientScanner extends AbstractClientScanner {
     private final Log LOG = LogFactory.getLog(this.getClass());
     protected Scan scan;
@@ -99,74 +98,6 @@ public class ClientScanner extends AbstractClientScanner {
           new RpcRetryingCallerFactory(conf), pool, primaryOperationTimeout);
     }
 
-    /**
-     * Create a new ClientScanner for the specified table.
-     * Note that the passed {@link Scan}'s start row maybe changed.
-     * 
-     * @param conf
-     * @param scan
-     * @param tableName
-     * @param connection
-     * @param rpcFactory
-     * @throws IOException
-     */
-    public ClientScanner(final Configuration conf, final Scan scan,
-        final TableName tableName, ClusterConnection connection, 
-        RpcRetryingCallerFactory rpcFactory)
-            throws IOException {
-      this(conf, scan, tableName, connection, rpcFactory, null, 0);
-    }
-
-    /**
-     * Create a new ClientScanner for the specified table. A ClusterConnection will be
-     * retrieved using the passed Configuration.
-     * Note that the passed {@link Scan}'s start row maybe changed.
-     *
-     * @param conf The {@link Configuration} to use.
-     * @param scan {@link Scan} to use in this scanner
-     * @param tableName The table that we wish to scan
-     * @throws IOException
-     */
-    public ClientScanner(final Configuration conf, final Scan scan,
-        final TableName tableName) throws IOException {
-      this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
-    }
-
-    /**
-     * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
-     */
-    @Deprecated
-    public ClientScanner(final Configuration conf, final Scan scan,
-        final byte [] tableName) throws IOException {
-      this(conf, scan, TableName.valueOf(tableName));
-    }
-
-
-    /**
-     * Create a new ClientScanner for the specified table
-     * Note that the passed {@link Scan}'s start row maybe changed.
-     *
-     * @param conf The {@link Configuration} to use.
-     * @param scan {@link Scan} to use in this scanner
-     * @param tableName The table that we wish to scan
-     * @param connection Connection identifying the cluster
-     * @throws IOException
-     */
-  public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      ClusterConnection connection) throws IOException {
-    this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), null, 0);
-  }
-
-  /**
-   * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
-   */
-  @Deprecated
-  public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
-      ClusterConnection connection) throws IOException {
-    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
-        null, 0);
-  }
-
   /**
    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
    * row maybe changed changed.

http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index ca2f431..20a3b9b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -48,8 +48,7 @@ import com.google.protobuf.ServiceException;
  * 
  * For small scan, it will get better performance than {@link ClientScanner}
  */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceAudience.Private
 public class ClientSmallScanner extends ClientScanner {
   private final Log LOG = LogFactory.getLog(this.getClass());
   private ScannerCallableWithReplicas smallScanCallable = null;
@@ -58,36 +57,6 @@ public class ClientSmallScanner extends ClientScanner {
   private byte[] skipRowOfFirstResult = null;
 
   /**
-   * Create a new ClientSmallScanner for the specified table. An HConnection
-   * will be retrieved using the passed Configuration. Note that the passed
-   * {@link Scan} 's start row maybe changed.
-   * 
-   * @param conf The {@link Configuration} to use.
-   * @param scan {@link Scan} to use in this scanner
-   * @param tableName The table that we wish to rangeGet
-   * @throws IOException
-   */
-  public ClientSmallScanner(final Configuration conf, final Scan scan,
-      final TableName tableName) throws IOException {
-    this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
-  }
-
-  /**
-   * Create a new ClientSmallScanner for the specified table. An HConnection
-   * will be retrieved using the passed Configuration. Note that the passed
-   * {@link Scan} 's start row maybe changed.
-   * @param conf
-   * @param scan
-   * @param tableName
-   * @param connection
-   * @throws IOException
-   */
-  public ClientSmallScanner(final Configuration conf, final Scan scan,
-      final TableName tableName, ClusterConnection connection) throws IOException {
-    this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
-  }
-
-  /**
    * Create a new ClientSmallScanner for the specified table. Note that the passed
    * {@link Scan} 's start row maybe changed. 
    * @param conf
@@ -101,24 +70,8 @@ public class ClientSmallScanner extends ClientScanner {
   public ClientSmallScanner(final Configuration conf, final Scan scan,
       final TableName tableName, ClusterConnection connection, ExecutorService pool,
       int primaryOperationTimeout) throws IOException {
-    super(conf, scan, tableName, connection, pool, primaryOperationTimeout);
-  }
-
-  /**
-   * Create a new ShortClientScanner for the specified table Note that the
-   * passed {@link Scan}'s start row maybe changed changed.
-   * 
-   * @param conf The {@link Configuration} to use.
-   * @param scan {@link Scan} to use in this scanner
-   * @param tableName The table that we wish to rangeGet
-   * @param connection Connection identifying the cluster
-   * @param rpcFactory
-   * @throws IOException
-   */
-  public ClientSmallScanner(final Configuration conf, final Scan scan,
-      final TableName tableName, ClusterConnection connection,
-      RpcRetryingCallerFactory rpcFactory) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory);
+    super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool,
+           primaryOperationTimeout);
   }
 
   @Override
@@ -196,6 +149,7 @@ public class ClientSmallScanner extends ClientScanner {
 
     @Override
     public Result[] call() throws IOException {
+      if (this.closed) return null;
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
@@ -217,9 +171,6 @@ public class ClientSmallScanner extends ClientScanner {
     public ScannerCallable getScannerCallableForReplica(int id) {
       return new SmallScannerCallable(id, this.getCaching());
     }
-
-    @Override
-    public void setClose(){}
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index d759757..c80a1f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -129,7 +129,7 @@ public class HTable implements HTableInterface {
   protected long currentWriteBufferSize;
   protected int scannerCaching;
   private int maxKeyValueSize;
-  private ExecutorService pool;  // For Multi
+  private ExecutorService pool;  // For Multi & Scan
   private boolean closed;
   private int operationTimeout;
   private int retries;

http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index c1940ae..0677d57 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -36,26 +36,12 @@ import org.apache.hadoop.hbase.util.Bytes;
 /**
  * A reversed client scanner which support backward scanning
  */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceAudience.Private
 public class ReversedClientScanner extends ClientScanner {
   private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
   // A byte array in which all elements are the max byte, and it is used to
   // construct closest front row
   static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
-  /**
-   * Create a new ReversibleClientScanner for the specified table Note that the
-   * passed {@link Scan}'s start row maybe changed.
-   * @param conf The {@link Configuration} to use.
-   * @param scan {@link Scan} to use in this scanner
-   * @param tableName The table that we wish to scan
-   * @param connection Connection identifying the cluster
-   * @throws IOException
-   */
-  public ReversedClientScanner(Configuration conf, Scan scan,
-      TableName tableName, ClusterConnection connection) throws IOException {
-    super(conf, scan, tableName, connection);
-  }
 
   /**
    * Create a new ReversibleClientScanner for the specified table Note that the
@@ -71,7 +57,8 @@ public class ReversedClientScanner extends ClientScanner {
   public ReversedClientScanner(Configuration conf, Scan scan,
       TableName tableName, ClusterConnection connection, ExecutorService pool,
       int primaryOperationTimeout) throws IOException {
-    super(conf, scan, tableName, connection, pool, primaryOperationTimeout);
+    super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool,
+          primaryOperationTimeout);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 14a3646..b79d6fb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -132,9 +132,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
         RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
         currentScannerCallable.getRow());
+
     // allocate a boundedcompletion pool of some multiple of number of replicas.
-    // We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close"
-    // RPCs for unneeded replica scans using the same pool)
+    // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
     BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
         new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);
 
@@ -151,7 +151,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
       if (f != null) {
         Pair<Result[], ScannerCallable> r = f.get();
         if (r != null && r.getSecond() != null) {
-          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
+          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
         }
         return r == null ? null : r.getFirst(); //great we got a response
       }
@@ -175,7 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
           Future<Pair<Result[], ScannerCallable>> f = cs.take();
           Pair<Result[], ScannerCallable> r = f.get();
           if (r != null && r.getSecond() != null) {
-            updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
+            updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
           }
           return r == null ? null : r.getFirst(); // great we got an answer
         } catch (ExecutionException e) {
@@ -204,7 +204,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
   }
 
   private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
-      AtomicBoolean done, BoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
+      AtomicBoolean done, ExecutorService pool) {
     if (done.compareAndSet(false, true)) {
       if (currentScannerCallable != scanner) replicaSwitched.set(true);
       currentScannerCallable = scanner;
@@ -226,7 +226,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
         // the table is closed (when the awaitTermination of the underlying pool is called)
         s.setClose();
         RetryingRPC r = new RetryingRPC(s);
-        cs.submit(r);
+        pool.submit(r);
       }
       // now clear outstandingCallables since we scheduled a close for all the contained scanners
       outstandingCallables.clear();