You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/22 08:57:40 UTC
hbase git commit: HBASE-17489 ClientScanner may send a next request
to a RegionScanner which has been exhausted
Repository: hbase
Updated Branches:
refs/heads/master 9a9e3df85 -> 3abd13dac
HBASE-17489 ClientScanner may send a next request to a RegionScanner which has been exhausted
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3abd13da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3abd13da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3abd13da
Branch: refs/heads/master
Commit: 3abd13dacb57927bd44a47632f4bd0c2e2bb87ea
Parents: 9a9e3df
Author: zhangduo <zh...@apache.org>
Authored: Sun Jan 22 10:02:29 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Jan 22 16:39:00 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/ClientScanner.java | 162 ++--
.../hbase/client/ReversedClientScanner.java | 8 +-
.../hadoop/hbase/client/TestClientScanner.java | 29 +-
.../hbase/regionserver/RSRpcServices.java | 963 ++++++++++---------
4 files changed, 613 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 283272a..ea91100 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -54,9 +54,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.util.Bytes;
/**
- * Implements the scanner interface for the HBase client.
- * If there are multiple regions in a table, this scanner will iterate
- * through them all.
+ * Implements the scanner interface for the HBase client. If there are multiple regions in a table,
+ * this scanner will iterate through them all.
*/
@InterfaceAudience.Private
public abstract class ClientScanner extends AbstractClientScanner {
@@ -229,15 +228,13 @@ public abstract class ClientScanner extends AbstractClientScanner {
return false; // unlikely.
}
- private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
- // If we have just switched replica, don't go to the next scanner yet. Rather, try
- // the scanner operations on the new replica, from the right point in the scan
- // Note that when we switched to a different replica we left it at a point
- // where we just did the "openScanner" with the appropriate startrow
- if (callable != null && callable.switchedToADifferentReplica()) return true;
- return nextScanner(nbRows, done);
+ protected final void closeScanner() throws IOException {
+ if (this.callable != null) {
+ this.callable.setClose();
+ call(callable, caller, scannerTimeout);
+ this.callable = null;
+ }
}
-
/*
* Gets a scanner for the next region. If this.currentRegion != null, then we will move to the
* endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no
@@ -248,11 +245,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
*/
protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
// Close the previous scanner if it's open
- if (this.callable != null) {
- this.callable.setClose();
- call(callable, caller, scannerTimeout);
- this.callable = null;
- }
+ closeScanner();
// Where to start the next scanner
byte[] localStartKey;
@@ -371,6 +364,37 @@ public abstract class ClientScanner extends AbstractClientScanner {
return cache != null ? cache.size() : 0;
}
+ private boolean regionExhausted(Result[] values) {
+ // This means the server tells us the whole scan operation is done. Usually decided by filter.
+ if (values == null) {
+ return true;
+ }
+ // Not a heartbeat message and we get nothing, this means the region is exhausted
+ if (values.length == 0 && !callable.isHeartbeatMessage()) {
+ return true;
+ }
+ // Server tells us that it has no more results for this region. Notice that this flag is get
+ // from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter
+ // one is false then we will get a null values and quit in the first condition of this method.
+ if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) {
+ return true;
+ }
+ return false;
+ }
+
+ private void closeScannerIfExhausted(boolean exhausted) throws IOException {
+ if (exhausted) {
+ if (!partialResults.isEmpty()) {
+ // XXX: continue if there are partial results. But in fact server should not set
+ // hasMoreResults to false if there are partial results.
+ LOG.warn("Server tells us there is no more results for this region but we still have"
+ + " partialResults, this should not happen, retry on the current scanner anyway");
+ } else {
+ closeScanner();
+ }
+ }
+ }
+
/**
* Contact the servers to load more {@link Result}s in the cache.
*/
@@ -380,17 +404,18 @@ public abstract class ClientScanner extends AbstractClientScanner {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
+ // This is possible if we just stopped at the boundary of a region in the previous call.
+ if (callable == null) {
+ if (!nextScanner(countdown, false)) {
+ return;
+ }
+ }
// We need to reset it if it's a new callable that was created with a countdown in nextScanner
callable.setCaching(this.caching);
// This flag is set when we want to skip the result returned. We do
// this when we reset scanner because it split under us.
boolean retryAfterOutOfOrderException = true;
- // We don't expect that the server will have more results for us if
- // it doesn't tell us otherwise. We rely on the size or count of results
- boolean serverHasMoreResults = false;
- boolean allResultsSkipped = false;
- do {
- allResultsSkipped = false;
+ for (;;) {
try {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
@@ -436,7 +461,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Reset the startRow to the row we've seen last so that the new scanner starts at
// the correct row. Otherwise we may see previously returned rows again.
// (ScannerCallable by now has "relocated" the correct region)
- if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
+ if (!this.lastResult.isPartial() && scan.getBatch() < 0) {
if (scan.isReversed()) {
scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
} else {
@@ -461,7 +486,10 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Set this to zero so we don't try and do an rpc and close on remote server when
// the exception we got was UnknownScanner or the Server is going down.
callable = null;
- // This continue will take us to while at end of loop where we will set up new scanner.
+ // reopen the scanner
+ if (!nextScanner(countdown, false)) {
+ break;
+ }
continue;
}
long currentTime = System.currentTimeMillis();
@@ -487,61 +515,58 @@ public abstract class ClientScanner extends AbstractClientScanner {
remainingResultSize -= estimatedHeapSizeOfResult;
addEstimatedSize(estimatedHeapSizeOfResult);
this.lastResult = rs;
- if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
+ if (this.lastResult.isPartial() || scan.getBatch() > 0) {
updateLastCellLoadedToCache(this.lastResult);
} else {
this.lastCellLoadedToCache = null;
}
}
- if (cache.isEmpty()) {
- // all result has been seen before, we need scan more.
- allResultsSkipped = true;
- continue;
- }
}
+ boolean exhausted = regionExhausted(values);
if (callable.isHeartbeatMessage()) {
- if (cache.size() > 0) {
+ if (!cache.isEmpty()) {
// Caller of this method just wants a Result. If we see a heartbeat message, it means
// processing of the scan is taking a long time server side. Rather than continue to
// loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
// unnecesary delays to the caller
if (LOG.isTraceEnabled()) {
LOG.trace("Heartbeat message received and cache contains Results."
- + " Breaking out of scan loop");
+ + " Breaking out of scan loop");
}
+ // we know that the region has not been exhausted yet so just break without calling
+ // closeScannerIfExhausted
break;
}
- continue;
}
-
- // We expect that the server won't have more results for us when we exhaust
- // the size (bytes or count) of the results returned. If the server *does* inform us that
- // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
- // get results is the moreResults context valid.
- if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
- // Only adhere to more server results when we don't have any partialResults
- // as it keeps the outer loop logic the same.
- serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
+ if (countdown <= 0) {
+ // we have enough result.
+ closeScannerIfExhausted(exhausted);
+ break;
}
- // Values == null means server-side filter has determined we must STOP
- // !partialResults.isEmpty() means that we are still accumulating partial Results for a
- // row. We should not change scanners before we receive all the partial Results for that
- // row.
- } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
- || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
- && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
- }
-
- /**
- * @param remainingResultSize
- * @param remainingRows
- * @param regionHasMoreResults
- * @return true when the current region has been exhausted. When the current region has been
- * exhausted, the region must be changed before scanning can continue
- */
- private boolean doneWithRegion(long remainingResultSize, int remainingRows,
- boolean regionHasMoreResults) {
- return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
+ if (remainingResultSize <= 0) {
+ if (!cache.isEmpty()) {
+ closeScannerIfExhausted(exhausted);
+ break;
+ } else {
+ // we have reached the max result size but we still can not find anything to return to the
+ // user. Reset the maxResultSize and try again.
+ remainingResultSize = maxScannerResultSize;
+ }
+ }
+ // we are done with the current region
+ if (exhausted) {
+ if (!partialResults.isEmpty()) {
+ // XXX: continue if there are partial results. But in fact server should not set
+ // hasMoreResults to false if there are partial results.
+ LOG.warn("Server tells us there is no more results for this region but we still have"
+ + " partialResults, this should not happen, retry on the current scanner anyway");
+ continue;
+ }
+ if (!nextScanner(countdown, values == null)) {
+ break;
+ }
+ }
+ }
}
protected void addEstimatedSize(long estimatedHeapSizeOfResult) {
@@ -566,9 +591,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
* @return the list of results that should be added to the cache.
* @throws IOException
*/
- protected List<Result>
- getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
- throws IOException {
+ protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
+ boolean heartbeatMessage) throws IOException {
int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
@@ -583,7 +607,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
// the batch size even though it may not be the last group of cells for that row.
if (allowPartials || isBatchSet) {
addResultsToList(resultsToAddToCache, resultsFromServer, 0,
- (null == resultsFromServer ? 0 : resultsFromServer.length));
+ (null == resultsFromServer ? 0 : resultsFromServer.length));
return resultsToAddToCache;
}
@@ -769,12 +793,12 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
/**
- * Compare two Cells considering reversed scanner.
- * ReversedScanner only reverses rows, not columns.
+ * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
+ * columns.
*/
private int compare(Cell a, Cell b) {
- CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion() ?
- CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
+ CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion()
+ ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
int r = comparator.compareRows(a, b);
if (r != 0) {
return this.scan.isReversed() ? -r : r;
http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index 390e236..e1a522a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -63,13 +63,7 @@ public class ReversedClientScanner extends ClientSimpleScanner {
protected boolean nextScanner(int nbRows, final boolean done)
throws IOException {
// Close the previous scanner if it's open
- if (this.callable != null) {
- this.callable.setClose();
- // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
- // we do a callWithRetries
- this.caller.callWithoutRetries(callable, scannerTimeout);
- this.callable = null;
- }
+ closeScanner();
// Where to start the next scanner
byte[] localStartKey;
http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index fd2a393..4319b9a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -158,7 +158,8 @@ public class TestClientScanner {
ScannerCallableWithReplicas.class);
switch (count) {
case 0: // initialize
- case 2: // close
+ case 2: // detect no more results
+ case 3: // close
count++;
return null;
case 1:
@@ -184,8 +185,10 @@ public class TestClientScanner {
scanner.loadCache();
- // One more call due to initializeScannerInConstruction()
- inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
+ // One for initializeScannerInConstruction()
+ // One for fetching the results
+ // One for fetching null results and quit as we do not have moreResults hint.
+ inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
assertEquals(1, scanner.cache.size());
@@ -224,7 +227,8 @@ public class TestClientScanner {
case 1:
count++;
callable.setHasMoreResultsContext(true);
- callable.setServerHasMoreResults(false);
+ // if we set false here the implementation will trigger a close
+ callable.setServerHasMoreResults(true);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
@@ -291,7 +295,8 @@ public class TestClientScanner {
case 1:
count++;
callable.setHasMoreResultsContext(true);
- callable.setServerHasMoreResults(false);
+ // if we set false here the implementation will trigger a close
+ callable.setServerHasMoreResults(true);
return results;
default:
throw new RuntimeException("Expected only 2 invocations");
@@ -470,13 +475,14 @@ public class TestClientScanner {
Mockito.anyInt());
InOrder inOrder = Mockito.inOrder(caller);
+ scanner.setRpcFinished(true);
scanner.loadCache();
- inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
+ inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
Mockito.any(RetryingCallable.class), Mockito.anyInt());
- assertEquals(1, scanner.cache.size());
+ assertEquals(2, scanner.cache.size());
Result r = scanner.cache.poll();
assertNotNull(r);
CellScanner cs = r.cellScanner();
@@ -484,15 +490,6 @@ public class TestClientScanner {
assertEquals(kv1, cs.current());
assertFalse(cs.advance());
- scanner.setRpcFinished(true);
-
- inOrder = Mockito.inOrder(caller);
-
- scanner.loadCache();
-
- inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
- Mockito.any(RetryingCallable.class), Mockito.anyInt());
-
r = scanner.cache.poll();
assertNotNull(r);
cs = r.cellScanner();
http://git-wip-us.apache.org/repos/asf/hbase/blob/3abd13da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 1e9f16b..a072dce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -38,9 +38,11 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -77,7 +79,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
-import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
@@ -96,6 +97,27 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.Leases.Lease;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.regionserver.Region.Operation;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
+import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionForSplitOrMergeResponse;
@@ -107,10 +129,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
@@ -134,9 +156,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -177,18 +196,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
-import org.apache.hadoop.hbase.quotas.OperationQuota;
-import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
-import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
-import org.apache.hadoop.hbase.regionserver.Leases.Lease;
-import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
-import org.apache.hadoop.hbase.regionserver.Region.Operation;
-import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
-import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
-import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -201,13 +208,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.zookeeper.KeeperException;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
-
/**
* Implements the regionserver RPC services.
*/
@@ -260,8 +260,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final PriorityFunction priority;
private final AtomicLong scannerIdGen = new AtomicLong(0L);
- private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
- new ConcurrentHashMap<String, RegionScannerHolder>();
+ private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
/**
* The lease timeout period for client scanners (milliseconds).
@@ -281,11 +280,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* An Rpc callback for closing a RegionScanner.
*/
- static class RegionScannerCloseCallBack implements RpcCallback {
+ private static final class RegionScannerCloseCallBack implements RpcCallback {
private final RegionScanner scanner;
- public RegionScannerCloseCallBack(RegionScanner scanner){
+ public RegionScannerCloseCallBack(RegionScanner scanner) {
this.scanner = scanner;
}
@@ -347,27 +346,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
/**
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
- private static class RegionScannerHolder {
- private AtomicLong nextCallSeq = new AtomicLong(0);
- private RegionScanner s;
- private Region r;
- final RpcCallback closeCallBack;
- final RpcCallback shippedCallback;
-
- public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack,
- RpcCallback shippedCallback) {
+ private static final class RegionScannerHolder {
+
+ private final AtomicLong nextCallSeq = new AtomicLong(0);
+ private final String scannerName;
+ private final RegionScanner s;
+ private final Region r;
+ private final RpcCallback closeCallBack;
+ private final RpcCallback shippedCallback;
+
+ public RegionScannerHolder(String scannerName, RegionScanner s, Region r,
+ RpcCallback closeCallBack, RpcCallback shippedCallback) {
+ this.scannerName = scannerName;
this.s = s;
this.r = r;
this.closeCallBack = closeCallBack;
this.shippedCallback = shippedCallback;
}
- private long getNextCallSeq() {
+ public long getNextCallSeq() {
return nextCallSeq.get();
}
- private void incNextCallSeq() {
- nextCallSeq.incrementAndGet();
+ public boolean incNextCallSeq(long currentSeq) {
+ // Use CAS to prevent multiple scan request running on the same scanner.
+ return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
}
}
@@ -476,19 +479,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
- private void addResults(final ScanResponse.Builder builder, final List<Result> results,
- final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
+ private void addResults(ScanResponse.Builder builder, List<Result> results,
+ HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
builder.setStale(!isDefaultRegion);
- if (results == null || results.isEmpty()) return;
+ if (results.isEmpty()) return;
if (clientCellBlockSupported) {
for (Result res : results) {
builder.addCellsPerResult(res.size());
builder.addPartialFlagPerResult(res.isPartial());
}
- ((HBaseRpcController)controller).
- setCellScanner(CellUtil.createCellScanner(results));
+ controller.setCellScanner(CellUtil.createCellScanner(results));
} else {
- for (Result res: results) {
+ for (Result res : results) {
ClientProtos.Result pbr = ProtobufUtil.toResult(res);
builder.addResults(pbr);
}
@@ -1131,6 +1133,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ public
RegionScanner getScanner(long scannerId) {
String scannerIdString = Long.toString(scannerId);
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
@@ -1202,10 +1205,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return lastBlock;
}
- RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
+ private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
throws LeaseStillHeldException {
Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
- new ScannerListener(scannerName));
+ new ScannerListener(scannerName));
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
RpcCallback closeCallback;
if (s instanceof RpcCallback) {
@@ -1213,7 +1216,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else {
closeCallback = new RegionScannerCloseCallBack(s);
}
- RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback);
+ RegionScannerHolder rsh =
+ new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
return rsh;
@@ -2610,444 +2614,498 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
- /**
- * Scan data in a table.
- *
- * @param controller the RPC controller
- * @param request the scan request
- * @throws ServiceException
- */
- @Override
- public ScanResponse scan(final RpcController controller, final ScanRequest request)
- throws ServiceException {
- OperationQuota quota = null;
- Leases.Lease lease = null;
- String scannerName = null;
- try {
- if (!request.hasScannerId() && !request.hasScan()) {
- throw new DoNotRetryIOException(
- "Missing required input: scannerId or scan");
- }
- long scannerId = -1;
- if (request.hasScannerId()) {
- scannerId = request.getScannerId();
- scannerName = String.valueOf(scannerId);
- }
- try {
- checkOpen();
- } catch (IOException e) {
- // If checkOpen failed, server not running or filesystem gone,
- // cancel this lease; filesystem is gone or we're closing or something.
- if (scannerName != null) {
- LOG.debug("Server shutting down and client tried to access missing scanner "
- + scannerName);
- if (regionServer.leases != null) {
- try {
- regionServer.leases.cancelLease(scannerName);
- } catch (LeaseException le) {
- // No problem, ignore
- if (LOG.isTraceEnabled()) {
- LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
- }
- }
- }
- }
- throw e;
- }
- requestCount.increment();
- rpcScanRequestCount.increment();
-
- int ttl = 0;
- Region region = null;
- RegionScanner scanner = null;
- RegionScannerHolder rsh = null;
- boolean moreResults = true;
- boolean closeScanner = false;
- boolean isSmallScan = false;
- ScanResponse.Builder builder = ScanResponse.newBuilder();
- if (request.hasCloseScanner()) {
- closeScanner = request.getCloseScanner();
- }
- int rows = closeScanner ? 0 : 1;
- if (request.hasNumberOfRows()) {
- rows = request.getNumberOfRows();
- }
- if (request.hasScannerId()) {
- rsh = scanners.get(scannerName);
- if (rsh == null) {
- LOG.warn("Client tried to access missing scanner " + scannerName);
- throw new UnknownScannerException(
+ // This is used to keep compatible with the old client implementation. Consider remove it if we
+ // decide to drop the support of the client that still sends close request to a region scanner
+ // which has already been exhausted.
+ @Deprecated
+ private static final IOException SCANNER_ALREADY_CLOSED = new IOException() {
+
+ private static final long serialVersionUID = -4305297078988180130L;
+
+ @Override
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+ };
+
+ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
+ String scannerName = Long.toString(request.getScannerId());
+ RegionScannerHolder rsh = scanners.get(scannerName);
+ if (rsh == null) {
+ // just ignore the close request if scanner does not exists.
+ if (request.hasCloseScanner() && request.getCloseScanner()) {
+ throw SCANNER_ALREADY_CLOSED;
+ } else {
+ LOG.warn("Client tried to access missing scanner " + scannerName);
+ throw new UnknownScannerException(
"Unknown scanner '" + scannerName + "'. This can happen due to any of the following "
+ "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of "
+ "long wait between consecutive client checkins, c) Server may be closing down, "
+ "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a "
+ "possible fix would be increasing the value of"
+ "'hbase.client.scanner.timeout.period' configuration.");
+ }
+ }
+ HRegionInfo hri = rsh.s.getRegionInfo();
+ // Yes, should be the same instance
+ if (regionServer.getOnlineRegion(hri.getRegionName()) != rsh.r) {
+ String msg = "Region was re-opened after the scanner" + scannerName + " was created: "
+ + hri.getRegionNameAsString();
+ LOG.warn(msg + ", closing...");
+ scanners.remove(scannerName);
+ try {
+ rsh.s.close();
+ } catch (IOException e) {
+ LOG.warn("Getting exception closing " + scannerName, e);
+ } finally {
+ try {
+ regionServer.leases.cancelLease(scannerName);
+ } catch (LeaseException e) {
+ LOG.warn("Getting exception closing " + scannerName, e);
}
- scanner = rsh.s;
- HRegionInfo hri = scanner.getRegionInfo();
- region = regionServer.getRegion(hri.getRegionName());
- if (region != rsh.r) { // Yes, should be the same instance
- throw new NotServingRegionException("Region was re-opened after the scanner"
- + scannerName + " was created: " + hri.getRegionNameAsString());
- }
- } else {
- region = getRegion(request.getRegion());
- ClientProtos.Scan protoScan = request.getScan();
- boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
- Scan scan = ProtobufUtil.toScan(protoScan);
- // if the request doesn't set this, get the default region setting.
- if (!isLoadingCfsOnDemandSet) {
- scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
- }
+ }
+ throw new NotServingRegionException(msg);
+ }
+ return rsh;
+ }
- isSmallScan = scan.isSmall();
- if (!scan.hasFamilies()) {
- // Adding all families to scanner
- for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
- scan.addFamily(family);
- }
- }
+ private Pair<RegionScannerHolder, Boolean> newRegionScanner(ScanRequest request,
+ ScanResponse.Builder builder) throws IOException {
+ Region region = getRegion(request.getRegion());
+ ClientProtos.Scan protoScan = request.getScan();
+ boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
+ Scan scan = ProtobufUtil.toScan(protoScan);
+ // if the request doesn't set this, get the default region setting.
+ if (!isLoadingCfsOnDemandSet) {
+ scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
+ }
- if (region.getCoprocessorHost() != null) {
- scanner = region.getCoprocessorHost().preScannerOpen(scan);
- }
- if (scanner == null) {
- scanner = region.getScanner(scan);
- }
- if (region.getCoprocessorHost() != null) {
- scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
- }
- scannerId = this.scannerIdGen.incrementAndGet();
- scannerName = String.valueOf(scannerId);
- rsh = addScanner(scannerName, scanner, region);
- ttl = this.scannerLeaseTimeoutPeriod;
- builder.setMvccReadPoint(scanner.getMvccReadPoint());
- }
- if (request.hasRenew() && request.getRenew()) {
- rsh = scanners.get(scannerName);
- lease = regionServer.leases.removeLease(scannerName);
- if (lease != null && rsh != null) {
- regionServer.leases.addLease(lease);
- // Increment the nextCallSeq value which is the next expected from client.
- rsh.incNextCallSeq();
- }
- return builder.build();
+ if (!scan.hasFamilies()) {
+ // Adding all families to scanner
+ for (byte[] family : region.getTableDesc().getFamiliesKeys()) {
+ scan.addFamily(family);
}
- RpcCallContext context = RpcServer.getCurrentCall();
- Object lastBlock = null;
+ }
+ RegionScanner scanner = null;
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().preScannerOpen(scan);
+ }
+ if (scanner == null) {
+ scanner = region.getScanner(scan);
+ }
+ if (region.getCoprocessorHost() != null) {
+ scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+ }
+ long scannerId = this.scannerIdGen.incrementAndGet();
+ builder.setScannerId(scannerId);
+ builder.setMvccReadPoint(scanner.getMvccReadPoint());
+ builder.setTtl(scannerLeaseTimeoutPeriod);
+ String scannerName = String.valueOf(scannerId);
+ return Pair.newPair(addScanner(scannerName, scanner, region), scan.isSmall());
+ }
- quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
- long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+ private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
+ throws OutOfOrderScannerNextException {
+ // if nextCallSeq does not match throw Exception straight away. This needs to be
+ // performed even before checking of Lease.
+ // See HBASE-5974
+ if (request.hasNextCallSeq()) {
+ long callSeq = request.getNextCallSeq();
+ if (!rsh.incNextCallSeq(callSeq)) {
+ throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.getNextCallSeq()
+ + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request="
+ + TextFormat.shortDebugString(request));
+ }
+ }
+ }
- if (rows > 0) {
- // if nextCallSeq does not match throw Exception straight away. This needs to be
- // performed even before checking of Lease.
- // See HBASE-5974
- if (request.hasNextCallSeq()) {
- if (rsh != null) {
- if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
- throw new OutOfOrderScannerNextException(
- "Expected nextCallSeq: " + rsh.getNextCallSeq()
- + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
- "; request=" + TextFormat.shortDebugString(request));
- }
- // Increment the nextCallSeq value which is the next expected from client.
- rsh.incNextCallSeq();
+ private void addScannerLeaseBack(Leases.Lease lease) {
+ try {
+ regionServer.leases.addLease(lease);
+ } catch (LeaseStillHeldException e) {
+ // should not happen as the scanner id is unique.
+ throw new AssertionError(e);
+ }
+ }
+
+ private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
+ // Set the time limit to be half of the more restrictive timeout value (one of the
+ // timeout values must be positive). In the event that both values are positive, the
+ // more restrictive of the two is used to calculate the limit.
+ if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
+ long timeLimitDelta;
+ if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
+ timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
+ } else {
+ timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
+ }
+ if (controller != null && controller.getCallTimeout() > 0) {
+ timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
+ }
+ // Use half of whichever timeout value was more restrictive... But don't allow
+ // the time limit to be less than the allowable minimum (could cause an
+ // immediatate timeout before scanning any data).
+ timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
+ // XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
+ // ManualEnvironmentEdge. Consider using System.nanoTime instead.
+ return System.currentTimeMillis() + timeLimitDelta;
+ }
+ // Default value of timeLimit is negative to indicate no timeLimit should be
+ // enforced.
+ return -1L;
+ }
+
+ // return whether we have more results in region.
+ private boolean scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
+ boolean isSmallScan, long maxQuotaResultSize, int rows, List<Result> results,
+ ScanResponse.Builder builder, MutableObject lastBlock, RpcCallContext context)
+ throws IOException {
+ Region region = rsh.r;
+ RegionScanner scanner = rsh.s;
+ long maxResultSize;
+ if (scanner.getMaxResultSize() > 0) {
+ maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
+ } else {
+ maxResultSize = maxQuotaResultSize;
+ }
+ // This is cells inside a row. Default size is 10 so if many versions or many cfs,
+ // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
+ // arbitrary 32. TODO: keep record of general size of results being returned.
+ List<Cell> values = new ArrayList<Cell>(32);
+ region.startRegionOperation(Operation.SCAN);
+ try {
+ int i = 0;
+ long before = EnvironmentEdgeManager.currentTime();
+ synchronized (scanner) {
+ boolean stale = (region.getRegionInfo().getReplicaId() != 0);
+ boolean clientHandlesPartials =
+ request.hasClientHandlesPartials() && request.getClientHandlesPartials();
+ boolean clientHandlesHeartbeats =
+ request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
+
+ // On the server side we must ensure that the correct ordering of partial results is
+ // returned to the client to allow them to properly reconstruct the partial results.
+ // If the coprocessor host is adding to the result list, we cannot guarantee the
+ // correct ordering of partial results and so we prevent partial results from being
+ // formed.
+ boolean serverGuaranteesOrderOfPartials = results.isEmpty();
+ boolean allowPartialResults =
+ clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
+ boolean moreRows = false;
+
+ // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
+ // certain time threshold on the server. When the time threshold is exceeded, the
+ // server stops the scan and sends back whatever Results it has accumulated within
+ // that time period (may be empty). Since heartbeat messages have the potential to
+ // create partial Results (in the event that the timeout occurs in the middle of a
+ // row), we must only generate heartbeat messages when the client can handle both
+ // heartbeats AND partials
+ boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
+
+ long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
+
+ final LimitScope sizeScope =
+ allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
+ final LimitScope timeScope =
+ allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
+
+ boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
+
+ // Configure with limits for this RPC. Set keep progress true since size progress
+ // towards size limit should be kept between calls to nextRaw
+ ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
+ contextBuilder.setSizeLimit(sizeScope, maxResultSize);
+ contextBuilder.setBatchLimit(scanner.getBatch());
+ contextBuilder.setTimeLimit(timeScope, timeLimit);
+ contextBuilder.setTrackMetrics(trackMetrics);
+ ScannerContext scannerContext = contextBuilder.build();
+ boolean limitReached = false;
+ while (i < rows) {
+ // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
+ // batch limit is a limit on the number of cells per Result. Thus, if progress is
+ // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
+ // reset the batch progress between nextRaw invocations since we don't want the
+ // batch progress from previous calls to affect future calls
+ scannerContext.setBatchProgress(0);
+
+ // Collect values to be returned here
+ moreRows = scanner.nextRaw(values, scannerContext);
+
+ if (!values.isEmpty()) {
+ final boolean partial = scannerContext.partialResultFormed();
+ Result r = Result.create(values, null, stale, partial);
+ lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
+ results.add(r);
+ i++;
}
- }
- boolean scannerClosed = false;
- try {
- // Remove lease while its being processed in server; protects against case
- // where processing of request takes > lease expiration time.
- lease = regionServer.leases.removeLease(scannerName);
- List<Result> results = new ArrayList<Result>();
- boolean done = false;
- // Call coprocessor. Get region info from scanner.
- if (region != null && region.getCoprocessorHost() != null) {
- Boolean bypass = region.getCoprocessorHost().preScannerNext(
- scanner, results, rows);
- if (!results.isEmpty()) {
- for (Result r : results) {
- lastBlock = addSize(context, r, lastBlock);
- }
+ boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
+ boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
+ boolean rowLimitReached = i >= rows;
+ limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
+
+ if (limitReached || !moreRows) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: " + moreRows
+ + " scannerContext: " + scannerContext);
}
- if (bypass != null && bypass.booleanValue()) {
- done = true;
+ // We only want to mark a ScanResponse as a heartbeat message in the event that
+ // there are more values to be read server side. If there aren't more values,
+ // marking it as a heartbeat is wasteful because the client will need to issue
+ // another ScanRequest only to realize that they already have all the values
+ if (moreRows) {
+ // Heartbeat messages occur when the time limit has been reached.
+ builder.setHeartbeatMessage(timeLimitReached);
}
+ break;
}
+ values.clear();
+ }
+ if (limitReached || moreRows) {
+ // We stopped prematurely
+ builder.setMoreResultsInRegion(true);
+ } else {
+ // We didn't get a single batch
+ builder.setMoreResultsInRegion(false);
+ }
- if (!done) {
- long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
- if (maxResultSize <= 0) {
- maxResultSize = maxQuotaResultSize;
- }
- // This is cells inside a row. Default size is 10 so if many versions or many cfs,
- // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
- // arbitrary 32. TODO: keep record of general size of results being returned.
- List<Cell> values = new ArrayList<Cell>(32);
- region.startRegionOperation(Operation.SCAN);
- try {
- int i = 0;
- long before = EnvironmentEdgeManager.currentTime();
- synchronized(scanner) {
- boolean stale = (region.getRegionInfo().getReplicaId() != 0);
- boolean clientHandlesPartials =
- request.hasClientHandlesPartials() && request.getClientHandlesPartials();
- boolean clientHandlesHeartbeats =
- request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
-
- // On the server side we must ensure that the correct ordering of partial results is
- // returned to the client to allow them to properly reconstruct the partial results.
- // If the coprocessor host is adding to the result list, we cannot guarantee the
- // correct ordering of partial results and so we prevent partial results from being
- // formed.
- boolean serverGuaranteesOrderOfPartials = results.isEmpty();
- boolean allowPartialResults =
- clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
- boolean moreRows = false;
-
- // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
- // certain time threshold on the server. When the time threshold is exceeded, the
- // server stops the scan and sends back whatever Results it has accumulated within
- // that time period (may be empty). Since heartbeat messages have the potential to
- // create partial Results (in the event that the timeout occurs in the middle of a
- // row), we must only generate heartbeat messages when the client can handle both
- // heartbeats AND partials
- boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
-
- // Default value of timeLimit is negative to indicate no timeLimit should be
- // enforced.
- long timeLimit = -1;
-
- // Set the time limit to be half of the more restrictive timeout value (one of the
- // timeout values must be positive). In the event that both values are positive, the
- // more restrictive of the two is used to calculate the limit.
- if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
- long timeLimitDelta;
- if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
- timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
- } else {
- timeLimitDelta =
- scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
- }
- if (controller != null) {
- if (controller instanceof HBaseRpcController) {
- HBaseRpcController pRpcController =
- (HBaseRpcController)controller;
- if (pRpcController.getCallTimeout() > 0) {
- timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
- }
- } else {
- throw new UnsupportedOperationException("We only do " +
- "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
- }
- }
- // Use half of whichever timeout value was more restrictive... But don't allow
- // the time limit to be less than the allowable minimum (could cause an
- // immediatate timeout before scanning any data).
- timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
- timeLimit = System.currentTimeMillis() + timeLimitDelta;
- }
-
- final LimitScope sizeScope =
- allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
- final LimitScope timeScope =
- allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
-
- boolean trackMetrics =
- request.hasTrackScanMetrics() && request.getTrackScanMetrics();
-
- // Configure with limits for this RPC. Set keep progress true since size progress
- // towards size limit should be kept between calls to nextRaw
- ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
- contextBuilder.setSizeLimit(sizeScope, maxResultSize);
- contextBuilder.setBatchLimit(scanner.getBatch());
- contextBuilder.setTimeLimit(timeScope, timeLimit);
- contextBuilder.setTrackMetrics(trackMetrics);
- ScannerContext scannerContext = contextBuilder.build();
- boolean limitReached = false;
- while (i < rows) {
- // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
- // batch limit is a limit on the number of cells per Result. Thus, if progress is
- // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
- // reset the batch progress between nextRaw invocations since we don't want the
- // batch progress from previous calls to affect future calls
- scannerContext.setBatchProgress(0);
-
- // Collect values to be returned here
- moreRows = scanner.nextRaw(values, scannerContext);
-
- if (!values.isEmpty()) {
- final boolean partial = scannerContext.partialResultFormed();
- Result r = Result.create(values, null, stale, partial);
- lastBlock = addSize(context, r, lastBlock);
- results.add(r);
- i++;
- }
-
- boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
- boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
- boolean rowLimitReached = i >= rows;
- limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
-
- if (limitReached || !moreRows) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
- + moreRows + " scannerContext: " + scannerContext);
- }
- // We only want to mark a ScanResponse as a heartbeat message in the event that
- // there are more values to be read server side. If there aren't more values,
- // marking it as a heartbeat is wasteful because the client will need to issue
- // another ScanRequest only to realize that they already have all the values
- if (moreRows) {
- // Heartbeat messages occur when the time limit has been reached.
- builder.setHeartbeatMessage(timeLimitReached);
- }
- break;
- }
- values.clear();
- }
-
- if (limitReached || moreRows) {
- // We stopped prematurely
- builder.setMoreResultsInRegion(true);
- } else {
- // We didn't get a single batch
- builder.setMoreResultsInRegion(false);
- }
-
- // Check to see if the client requested that we track metrics server side. If the
- // client requested metrics, retrieve the metrics from the scanner context.
- if (trackMetrics) {
- Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
- ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
- NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
-
- for (Entry<String, Long> entry : metrics.entrySet()) {
- pairBuilder.setName(entry.getKey());
- pairBuilder.setValue(entry.getValue());
- metricBuilder.addMetrics(pairBuilder.build());
- }
-
- builder.setScanMetrics(metricBuilder.build());
- }
- }
- region.updateReadRequestsCount(i);
- long end = EnvironmentEdgeManager.currentTime();
- long responseCellSize = context != null ? context.getResponseCellSize() : 0;
- region.getMetrics().updateScanTime(end - before);
- if (regionServer.metricsRegionServer != null) {
- regionServer.metricsRegionServer.updateScanSize(responseCellSize);
- regionServer.metricsRegionServer.updateScanTime(end - before);
- }
- } finally {
- region.closeRegionOperation();
- }
- // coprocessor postNext hook
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
- }
+ // Check to see if the client requested that we track metrics server side. If the
+ // client requested metrics, retrieve the metrics from the scanner context.
+ if (trackMetrics) {
+ Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
+ ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
+ NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
+
+ for (Entry<String, Long> entry : metrics.entrySet()) {
+ pairBuilder.setName(entry.getKey());
+ pairBuilder.setValue(entry.getValue());
+ metricBuilder.addMetrics(pairBuilder.build());
}
- quota.addScanResult(results);
+ builder.setScanMetrics(metricBuilder.build());
+ }
+ }
+ region.updateReadRequestsCount(i);
+ long end = EnvironmentEdgeManager.currentTime();
+ long responseCellSize = context != null ? context.getResponseCellSize() : 0;
+ region.getMetrics().updateScanTime(end - before);
+ if (regionServer.metricsRegionServer != null) {
+ regionServer.metricsRegionServer.updateScanSize(responseCellSize);
+ regionServer.metricsRegionServer.updateScanTime(end - before);
+ }
+ } finally {
+ region.closeRegionOperation();
+ }
+ // coprocessor postNext hook
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
+ }
+ return builder.getMoreResultsInRegion();
+ }
- // If the scanner's filter - if any - is done with the scan
- // and wants to tell the client to stop the scan. This is done by passing
- // a null result, and setting moreResults to false.
- if (scanner.isFilterDone() && results.isEmpty()) {
- moreResults = false;
- results = null;
- } else {
- addResults(builder, results, controller,
- RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
- isClientCellBlockSupport(context));
- }
- } catch (IOException e) {
- // The scanner state might be left in a dirty state, so we will tell the Client to
- // fail this RPC and close the scanner while opening up another one from the start of
- // row that the client has last seen.
- closeScanner(region, scanner, scannerName, context);
- // scanner is closed here
- scannerClosed = true;
-
- // If it is a CorruptHFileException or a FileNotFoundException, throw the
- // DoNotRetryIOException. This can avoid the retry in ClientScanner.
- if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) {
- throw new DoNotRetryIOException(e);
- }
- // We closed the scanner already. Instead of throwing the IOException, and client
- // retrying with the same scannerId only to get USE on the next RPC, we directly throw
- // a special exception to save an RPC.
- if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
- // 1.4.0+ clients know how to handle
- throw new ScannerResetException("Scanner is closed on the server-side", e);
- } else {
- // older clients do not know about SRE. Just throw USE, which they will handle
- throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
- + " scanner state for clients older than 1.3.", e);
- }
- } finally {
- // If the scanner is not closed, set the shipped callback
- if (!scannerClosed) {
- if (context != null) {
- context.setCallBack(rsh.shippedCallback);
+ /**
+ * Scan data in a table.
+ *
+ * @param controller the RPC controller
+ * @param request the scan request
+ * @throws ServiceException
+ */
+ @Override
+ public ScanResponse scan(final RpcController controller, final ScanRequest request)
+ throws ServiceException {
+ if (controller != null && !(controller instanceof HBaseRpcController)) {
+ throw new UnsupportedOperationException(
+ "We only do " + "HBaseRpcControllers! FIX IF A PROBLEM: " + controller);
+ }
+ if (!request.hasScannerId() && !request.hasScan()) {
+ throw new ServiceException(
+ new DoNotRetryIOException("Missing required input: scannerId or scan"));
+ }
+ try {
+ checkOpen();
+ } catch (IOException e) {
+ if (request.hasScannerId()) {
+ String scannerName = Long.toString(request.getScannerId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Server shutting down and client tried to access missing scanner " + scannerName);
+ }
+ if (regionServer.leases != null) {
+ try {
+ regionServer.leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ // No problem, ignore
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
}
-
- // Adding resets expiration time on lease.
- if (scanners.containsKey(scannerName)) {
- ttl = this.scannerLeaseTimeoutPeriod;
- // When context != null, adding back the lease will be done in callback set above.
- if (context == null) {
- if (lease != null) regionServer.leases.addLease(lease);
- }
+ }
+ }
+ }
+ throw new ServiceException(e);
+ }
+ requestCount.increment();
+ rpcScanRequestCount.increment();
+ RegionScannerHolder rsh;
+ ScanResponse.Builder builder = ScanResponse.newBuilder();
+ boolean isSmallScan;
+ try {
+ if (request.hasScannerId()) {
+ rsh = getRegionScanner(request);
+ isSmallScan = false;
+ } else {
+ Pair<RegionScannerHolder, Boolean> pair = newRegionScanner(request, builder);
+ rsh = pair.getFirst();
+ isSmallScan = pair.getSecond().booleanValue();
+ }
+ } catch (IOException e) {
+ if (e == SCANNER_ALREADY_CLOSED) {
+ // Now we will close scanner automatically if there are no more results for this region but
+ // the old client will still send a close request to us. Just ignore it and return.
+ return builder.build();
+ }
+ throw new ServiceException(e);
+ }
+ Region region = rsh.r;
+ String scannerName = rsh.scannerName;
+ Leases.Lease lease;
+ try {
+ // Remove lease while its being processed in server; protects against case
+ // where processing of request takes > lease expiration time.
+ lease = regionServer.leases.removeLease(scannerName);
+ } catch (LeaseException e) {
+ throw new ServiceException(e);
+ }
+ if (request.hasRenew() && request.getRenew()) {
+ // add back and return
+ addScannerLeaseBack(lease);
+ try {
+ checkScanNextCallSeq(request, rsh);
+ } catch (OutOfOrderScannerNextException e) {
+ throw new ServiceException(e);
+ }
+ return builder.build();
+ }
+ OperationQuota quota;
+ try {
+ quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+ } catch (IOException e) {
+ addScannerLeaseBack(lease);
+ throw new ServiceException(e);
+ };
+ try {
+ checkScanNextCallSeq(request, rsh);
+ } catch (OutOfOrderScannerNextException e) {
+ addScannerLeaseBack(lease);
+ throw new ServiceException(e);
+ }
+ // Now we have increased the next call sequence. If we give client an error, the retry will
+ // never success. So we'd better close the scanner and return a DoNotRetryIOException to client
+ // and then client will try to open a new scanner.
+ boolean closeScanner = request.hasCloseScanner() ? request.getCloseScanner() : false;
+ int rows; // this is scan.getCaching
+ if (request.hasNumberOfRows()) {
+ rows = request.getNumberOfRows();
+ } else {
+ rows = closeScanner ? 0 : 1;
+ }
+ RpcCallContext context = RpcServer.getCurrentCall();
+ // now let's do the real scan.
+ long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+ RegionScanner scanner = rsh.s;
+ boolean moreResults = true;
+ boolean moreResultsInRegion = true;
+ MutableObject lastBlock = new MutableObject();
+ boolean scannerClosed = false;
+ try {
+ List<Result> results = new ArrayList<>();
+ if (rows > 0) {
+ boolean done = false;
+ // Call coprocessor. Get region info from scanner.
+ if (region.getCoprocessorHost() != null) {
+ Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
+ if (!results.isEmpty()) {
+ for (Result r : results) {
+ lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
}
}
+ if (bypass != null && bypass.booleanValue()) {
+ done = true;
+ }
+ }
+ if (!done) {
+ moreResultsInRegion = scan((HBaseRpcController) controller, request, rsh, isSmallScan,
+ maxQuotaResultSize, rows, results, builder, lastBlock, context);
}
}
- if (!moreResults || closeScanner) {
- ttl = 0;
+ quota.addScanResult(results);
+
+ if (scanner.isFilterDone() && results.isEmpty()) {
+ // If the scanner's filter - if any - is done with the scan
+ // only set moreResults to false if the results is empty. This is used to keep compatible
+ // with the old scan implementation where we just ignore the returned results if moreResults
+ // is false. Can remove the isEmpty check after we get rid of the old implementation.
moreResults = false;
- if (closeScanner(region, scanner, scannerName, context)) {
- return builder.build(); // bypass
- }
}
-
- if (ttl > 0) {
- builder.setTtl(ttl);
+ addResults(builder, results, (HBaseRpcController) controller,
+ RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
+ isClientCellBlockSupport(context));
+ if (!moreResults || !moreResultsInRegion || closeScanner) {
+ scannerClosed = true;
+ closeScanner(region, scanner, scannerName, context);
}
- builder.setScannerId(scannerId);
builder.setMoreResults(moreResults);
return builder.build();
- } catch (IOException ie) {
- if (scannerName != null && ie instanceof NotServingRegionException) {
- RegionScannerHolder rsh = scanners.remove(scannerName);
- if (rsh != null) {
- try {
- RegionScanner scanner = rsh.s;
- LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
- scanner.close();
- regionServer.leases.cancelLease(scannerName);
- } catch (IOException e) {
- LOG.warn("Getting exception closing " + scannerName, e);
- }
+ } catch (Exception e) {
+ try {
+ // scanner is closed here
+ scannerClosed = true;
+ // The scanner state might be left in a dirty state, so we will tell the Client to
+ // fail this RPC and close the scanner while opening up another one from the start of
+ // row that the client has last seen.
+ closeScanner(region, scanner, scannerName, context);
+
+
+ // If it is a CorruptHFileException or a FileNotFoundException, throw the
+ // DoNotRetryIOException. This can avoid the retry in ClientScanner.
+ if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) {
+ throw new DoNotRetryIOException(e);
}
+ // We closed the scanner already. Instead of throwing the IOException, and client
+ // retrying with the same scannerId only to get USE on the next RPC, we directly throw
+ // a special exception to save an RPC.
+ if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
+ // 1.4.0+ clients know how to handle
+ throw new ScannerResetException("Scanner is closed on the server-side", e);
+ } else {
+ // older clients do not know about SRE. Just throw USE, which they will handle
+ throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ + " scanner state for clients older than 1.3.", e);
+ }
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
}
- throw new ServiceException(ie);
} finally {
- if (quota != null) {
- quota.close();
+ if (!scannerClosed) {
+ // Adding resets expiration time on lease.
+ // the closeCallBack will be set in closeScanner so here we only care about shippedCallback
+ if (context != null) {
+ context.setCallBack(rsh.shippedCallback);
+ } else {
+ // When context != null, adding back the lease will be done in callback set above.
+ addScannerLeaseBack(lease);
+ }
}
+ quota.close();
}
}
- private boolean closeScanner(Region region, RegionScanner scanner, String scannerName,
+ private void closeScanner(Region region, RegionScanner scanner, String scannerName,
RpcCallContext context) throws IOException {
- if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
- return true; // bypass
+ // bypass the actual close.
+ return;
}
}
RegionScannerHolder rsh = scanners.remove(scannerName);
@@ -3057,19 +3115,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} else {
rsh.s.close();
}
- try {
- regionServer.leases.cancelLease(scannerName);
- } catch (LeaseException le) {
- // No problem, ignore
- if (LOG.isTraceEnabled()) {
- LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
- }
- }
- if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
- return false;
}
@Override