You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2014/05/22 19:02:42 UTC
git commit: HBASE-11214. Fixes for scans on a replicated table
Repository: hbase
Updated Branches:
refs/heads/hbase-10070 d02bb538d -> 97b7df274
HBASE-11214. Fixes for scans on a replicated table
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/97b7df27
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/97b7df27
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/97b7df27
Branch: refs/heads/hbase-10070
Commit: 97b7df274c27ae1075d0782ad31cf284362eaf48
Parents: d02bb53
Author: Devaraj Das <dd...@Devaraj-Dass-MacBook-Pro-2.local>
Authored: Thu May 22 10:01:50 2014 -0700
Committer: Devaraj Das <dd...@Devaraj-Dass-MacBook-Pro-2.local>
Committed: Thu May 22 10:01:50 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/ClientScanner.java | 71 +-------------------
.../hadoop/hbase/client/ClientSmallScanner.java | 57 ++--------------
.../org/apache/hadoop/hbase/client/HTable.java | 2 +-
.../hbase/client/ReversedClientScanner.java | 19 +-----
.../client/ScannerCallableWithReplicas.java | 12 ++--
5 files changed, 15 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 3bdce36..14a5608 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -48,8 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* If there are multiple regions in a table, this scanner will iterate
* through them all.
*/
-@InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceAudience.Private
public class ClientScanner extends AbstractClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
protected Scan scan;
@@ -99,74 +98,6 @@ public class ClientScanner extends AbstractClientScanner {
new RpcRetryingCallerFactory(conf), pool, primaryOperationTimeout);
}
- /**
- * Create a new ClientScanner for the specified table.
- * Note that the passed {@link Scan}'s start row maybe changed.
- *
- * @param conf
- * @param scan
- * @param tableName
- * @param connection
- * @param rpcFactory
- * @throws IOException
- */
- public ClientScanner(final Configuration conf, final Scan scan,
- final TableName tableName, ClusterConnection connection,
- RpcRetryingCallerFactory rpcFactory)
- throws IOException {
- this(conf, scan, tableName, connection, rpcFactory, null, 0);
- }
-
- /**
- * Create a new ClientScanner for the specified table. A ClusterConnection will be
- * retrieved using the passed Configuration.
- * Note that the passed {@link Scan}'s start row maybe changed.
- *
- * @param conf The {@link Configuration} to use.
- * @param scan {@link Scan} to use in this scanner
- * @param tableName The table that we wish to scan
- * @throws IOException
- */
- public ClientScanner(final Configuration conf, final Scan scan,
- final TableName tableName) throws IOException {
- this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
- }
-
- /**
- * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
- */
- @Deprecated
- public ClientScanner(final Configuration conf, final Scan scan,
- final byte [] tableName) throws IOException {
- this(conf, scan, TableName.valueOf(tableName));
- }
-
-
- /**
- * Create a new ClientScanner for the specified table
- * Note that the passed {@link Scan}'s start row maybe changed.
- *
- * @param conf The {@link Configuration} to use.
- * @param scan {@link Scan} to use in this scanner
- * @param tableName The table that we wish to scan
- * @param connection Connection identifying the cluster
- * @throws IOException
- */
- public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
- ClusterConnection connection) throws IOException {
- this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), null, 0);
- }
-
- /**
- * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
- */
- @Deprecated
- public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
- ClusterConnection connection) throws IOException {
- this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
- null, 0);
- }
-
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
* row maybe changed changed.
http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index ca2f431..20a3b9b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -48,8 +48,7 @@ import com.google.protobuf.ServiceException;
*
* For small scan, it will get better performance than {@link ClientScanner}
*/
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceAudience.Private
public class ClientSmallScanner extends ClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
private ScannerCallableWithReplicas smallScanCallable = null;
@@ -58,36 +57,6 @@ public class ClientSmallScanner extends ClientScanner {
private byte[] skipRowOfFirstResult = null;
/**
- * Create a new ClientSmallScanner for the specified table. An HConnection
- * will be retrieved using the passed Configuration. Note that the passed
- * {@link Scan} 's start row maybe changed.
- *
- * @param conf The {@link Configuration} to use.
- * @param scan {@link Scan} to use in this scanner
- * @param tableName The table that we wish to rangeGet
- * @throws IOException
- */
- public ClientSmallScanner(final Configuration conf, final Scan scan,
- final TableName tableName) throws IOException {
- this(conf, scan, tableName, ConnectionManager.getConnectionInternal(conf));
- }
-
- /**
- * Create a new ClientSmallScanner for the specified table. An HConnection
- * will be retrieved using the passed Configuration. Note that the passed
- * {@link Scan} 's start row maybe changed.
- * @param conf
- * @param scan
- * @param tableName
- * @param connection
- * @throws IOException
- */
- public ClientSmallScanner(final Configuration conf, final Scan scan,
- final TableName tableName, ClusterConnection connection) throws IOException {
- this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
- }
-
- /**
* Create a new ClientSmallScanner for the specified table. Note that the passed
* {@link Scan} 's start row maybe changed.
* @param conf
@@ -101,24 +70,8 @@ public class ClientSmallScanner extends ClientScanner {
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, ClusterConnection connection, ExecutorService pool,
int primaryOperationTimeout) throws IOException {
- super(conf, scan, tableName, connection, pool, primaryOperationTimeout);
- }
-
- /**
- * Create a new ShortClientScanner for the specified table Note that the
- * passed {@link Scan}'s start row maybe changed changed.
- *
- * @param conf The {@link Configuration} to use.
- * @param scan {@link Scan} to use in this scanner
- * @param tableName The table that we wish to rangeGet
- * @param connection Connection identifying the cluster
- * @param rpcFactory
- * @throws IOException
- */
- public ClientSmallScanner(final Configuration conf, final Scan scan,
- final TableName tableName, ClusterConnection connection,
- RpcRetryingCallerFactory rpcFactory) throws IOException {
- super(conf, scan, tableName, connection, rpcFactory);
+ super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool,
+ primaryOperationTimeout);
}
@Override
@@ -196,6 +149,7 @@ public class ClientSmallScanner extends ClientScanner {
@Override
public Result[] call() throws IOException {
+ if (this.closed) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
@@ -217,9 +171,6 @@ public class ClientSmallScanner extends ClientScanner {
public ScannerCallable getScannerCallableForReplica(int id) {
return new SmallScannerCallable(id, this.getCaching());
}
-
- @Override
- public void setClose(){}
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index d759757..c80a1f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -129,7 +129,7 @@ public class HTable implements HTableInterface {
protected long currentWriteBufferSize;
protected int scannerCaching;
private int maxKeyValueSize;
- private ExecutorService pool; // For Multi
+ private ExecutorService pool; // For Multi & Scan
private boolean closed;
private int operationTimeout;
private int retries;
http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index c1940ae..0677d57 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -36,26 +36,12 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* A reversed client scanner which support backward scanning
*/
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceAudience.Private
public class ReversedClientScanner extends ClientScanner {
private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
// A byte array in which all elements are the max byte, and it is used to
// construct closest front row
static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
- /**
- * Create a new ReversibleClientScanner for the specified table Note that the
- * passed {@link Scan}'s start row maybe changed.
- * @param conf The {@link Configuration} to use.
- * @param scan {@link Scan} to use in this scanner
- * @param tableName The table that we wish to scan
- * @param connection Connection identifying the cluster
- * @throws IOException
- */
- public ReversedClientScanner(Configuration conf, Scan scan,
- TableName tableName, ClusterConnection connection) throws IOException {
- super(conf, scan, tableName, connection);
- }
/**
* Create a new ReversibleClientScanner for the specified table Note that the
@@ -71,7 +57,8 @@ public class ReversedClientScanner extends ClientScanner {
public ReversedClientScanner(Configuration conf, Scan scan,
TableName tableName, ClusterConnection connection, ExecutorService pool,
int primaryOperationTimeout) throws IOException {
- super(conf, scan, tableName, connection, pool, primaryOperationTimeout);
+ super(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf), pool,
+ primaryOperationTimeout);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/97b7df27/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 14a3646..b79d6fb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -132,9 +132,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
currentScannerCallable.getRow());
+
// allocate a boundedcompletion pool of some multiple of number of replicas.
- // We want accommodate the "scan" RPC call and the "close" RPC call (we schedule "close"
- // RPCs for unneeded replica scans using the same pool)
+ // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);
@@ -151,7 +151,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
if (f != null) {
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
- updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
+ updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); //great we got a response
}
@@ -175,7 +175,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
Future<Pair<Result[], ScannerCallable>> f = cs.take();
Pair<Result[], ScannerCallable> r = f.get();
if (r != null && r.getSecond() != null) {
- updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, cs);
+ updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
}
return r == null ? null : r.getFirst(); // great we got an answer
} catch (ExecutionException e) {
@@ -204,7 +204,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
- AtomicBoolean done, BoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
+ AtomicBoolean done, ExecutorService pool) {
if (done.compareAndSet(false, true)) {
if (currentScannerCallable != scanner) replicaSwitched.set(true);
currentScannerCallable = scanner;
@@ -226,7 +226,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// the table is closed (when the awaitTermination of the underlying pool is called)
s.setClose();
RetryingRPC r = new RetryingRPC(s);
- cs.submit(r);
+ pool.submit(r);
}
// now clear outstandingCallables since we scheduled a close for all the contained scanners
outstandingCallables.clear();