You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/09/23 01:47:47 UTC
[1/3] hbase git commit: HBASE-16604 Scanner retries on IOException
can cause the scans to miss data
Repository: hbase
Updated Branches:
refs/heads/branch-1.1 97ce640f5 -> 9d424c20c
refs/heads/branch-1.2 32a7f2c4b -> d307ad19f
refs/heads/branch-1.3 73d4edbfa -> d600e8b70
HBASE-16604 Scanner retries on IOException can cause the scans to miss data
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d600e8b7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d600e8b7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d600e8b7
Branch: refs/heads/branch-1.3
Commit: d600e8b70e10281ec19e3316ca0fd461d824a018
Parents: 73d4edb
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Sep 22 17:41:01 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Sep 22 17:41:27 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/UnknownScannerException.java | 4 +
.../hadoop/hbase/client/ClientScanner.java | 4 +-
.../hadoop/hbase/client/ScannerCallable.java | 26 +++--
.../hbase/exceptions/ScannerResetException.java | 50 +++++++++
.../hbase/ipc/MetricsHBaseServerSource.java | 2 +
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 8 ++
.../hadoop/hbase/ipc/MetricsHBaseServer.java | 3 +
.../hbase/regionserver/RSRpcServices.java | 63 +++++++----
.../hadoop/hbase/HBaseTestingUtility.java | 18 ++-
.../hadoop/hbase/client/TestFromClientSide.java | 77 ++++++++++++-
.../hbase/client/TestTableSnapshotScanner.java | 2 +-
.../TableSnapshotInputFormatTestBase.java | 2 +-
.../mapreduce/TestMultithreadedTableMapper.java | 3 +-
.../hbase/mapreduce/TestTableMapReduce.java | 4 +-
.../hbase/mapreduce/TestTableMapReduceBase.java | 2 +-
.../regionserver/DelegatingKeyValueScanner.java | 109 +++++++++++++++++++
16 files changed, 337 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
index b951221..3e7b22d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
@@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException {
public UnknownScannerException(String s) {
super(s);
}
+
+ public UnknownScannerException(String s, Exception e) {
+ super(s, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 1a9df97..944f44e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@@ -433,7 +434,8 @@ public class ClientScanner extends AbstractClientScanner {
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException ||
- e instanceof UnknownScannerException ) {
+ e instanceof UnknownScannerException ||
+ e instanceof ScannerResetException) {
// Pass. It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 8912e58..7d3b9e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -107,9 +108,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
* @param connection which connection
* @param tableName table callable is on
* @param scan the scan to execute
- * @param scanMetrics the ScanMetrics to used, if it is null,
+ * @param scanMetrics the ScanMetrics to used, if it is null,
* ScannerCallable won't collect metrics
- * @param rpcControllerFactory factory to use when creating
+ * @param rpcControllerFactory factory to use when creating
* {@link com.google.protobuf.RpcController}
*/
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
@@ -275,14 +276,19 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
- if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
- try {
- HRegionLocation location =
- getConnection().relocateRegion(getTableName(), scan.getStartRow());
- LOG.info("Scanner=" + scannerId
- + " expired, current region location is " + location.toString());
- } catch (Throwable t) {
- LOG.info("Failed to relocate region", t);
+ if (logScannerActivity) {
+ if (ioe instanceof UnknownScannerException) {
+ try {
+ HRegionLocation location =
+ getConnection().relocateRegion(getTableName(), scan.getStartRow());
+ LOG.info("Scanner=" + scannerId
+ + " expired, current region location is " + location.toString());
+ } catch (Throwable t) {
+ LOG.info("Failed to relocate region", t);
+ }
+ } else if (ioe instanceof ScannerResetException) {
+ LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+ + "asked us to reset the scanner state.", ioe);
}
}
// The below convertion of exceptions into DoNotRetryExceptions is a little strange.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
new file mode 100644
index 0000000..7689eb1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when the server side has received an Exception, and asks the Client to reset the scanner
+ * state by closing the current region scanner, and reopening from the start of last seen row.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ScannerResetException extends DoNotRetryIOException {
+ private static final long serialVersionUID = -5649728171144849619L;
+
+ /** constructor */
+ public ScannerResetException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public ScannerResetException(String s) {
+ super(s);
+ }
+
+ public ScannerResetException(String s, Exception e) {
+ super(s, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 3305f65..ac14bd8 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -77,6 +77,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException";
String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException";
String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException";
+ String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException";
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
@@ -106,6 +107,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void movedRegionException();
void notServingRegionException();
void unknownScannerException();
+ void scannerResetException();
void tooBusyException();
void multiActionTooLargeException();
void callQueueTooBigException();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index fafa9d0..d372b1b 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -45,6 +45,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableFastCounter exceptionsOOO;
private final MutableFastCounter exceptionsBusy;
private final MutableFastCounter exceptionsUnknown;
+ private final MutableFastCounter exceptionsScannerReset;
private final MutableFastCounter exceptionsSanity;
private final MutableFastCounter exceptionsNSRE;
private final MutableFastCounter exceptionsMoved;
@@ -78,6 +79,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsUnknown = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+ this.exceptionsScannerReset = this.getMetricsRegistry()
+ .newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsSanity = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsMoved = this.getMetricsRegistry()
@@ -162,6 +165,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
}
@Override
+ public void scannerResetException() {
+ exceptionsScannerReset.incr();
+ }
+
+ @Override
public void tooBusyException() {
exceptionsBusy.incr();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index 838bdf6..fe03d4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@InterfaceAudience.Private
public class MetricsHBaseServer {
@@ -103,6 +104,8 @@ public class MetricsHBaseServer {
source.tooBusyException();
} else if (throwable instanceof UnknownScannerException) {
source.unknownScannerException();
+ } else if (throwable instanceof ScannerResetException) {
+ source.scannerResetException();
} else if (throwable instanceof RegionMovedException) {
source.movedRegionException();
} else if (throwable instanceof NotServingRegionException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 135bf6a..e5caefd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -2730,13 +2731,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
} catch (IOException e) {
- // if we have an exception on scanner next and we are using the callSeq
- // we should rollback because the client will retry with the same callSeq
- // and get an OutOfOrderScannerNextException if we don't do so.
- if (rsh != null && request.hasNextCallSeq()) {
- rsh.rollbackNextCallSeq();
+ // The scanner state might be left in a dirty state, so we will tell the Client to
+ // fail this RPC and close the scanner while opening up another one from the start of
+ // row that the client has last seen.
+ closeScanner(region, scanner, scannerName);
+
+ // We closed the scanner already. Instead of throwing the IOException, and client
+ // retrying with the same scannerId only to get USE on the next RPC, we directly throw
+ // a special exception to save an RPC.
+ if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
+ // 1.4.0+ clients know how to handle
+ throw new ScannerResetException("Scanner is closed on the server-side", e);
+ } else {
+ // older clients do not know about SRE. Just throw USE, which they will handle
+ throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ + " scanner state for clients older than 1.3.", e);
}
- throw e;
} finally {
// We're done. On way out re-add the above removed lease.
// Adding resets expiration time on lease.
@@ -2750,20 +2760,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
- if (region != null && region.getCoprocessorHost() != null) {
- if (region.getCoprocessorHost().preScannerClose(scanner)) {
- return builder.build(); // bypass
- }
- }
- rsh = scanners.remove(scannerName);
- if (rsh != null) {
- scanner = rsh.s;
- scanner.close();
- regionServer.leases.cancelLease(scannerName);
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerClose(scanner);
- }
- }
+ closeScanner(region, scanner, scannerName);
}
if (ttl > 0) {
@@ -2794,6 +2791,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
+ throws IOException {
+ if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost().preScannerClose(scanner)) {
+ return true; // bypass
+ }
+ }
+ RegionScannerHolder rsh = scanners.remove(scannerName);
+ if (rsh != null) {
+ scanner = rsh.s;
+ scanner.close();
+ try {
+ regionServer.leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ // No problem, ignore
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
+ }
+ }
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(scanner);
+ }
+ }
+ return false;
+ }
+
@Override
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 0f6281e..5477527 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -2007,6 +2007,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return HRegion.createHRegion(info, rootDir, conf, htd);
}
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[] family) {
+ return createTableDescriptor(tableName, new byte[][] {family}, 1);
+ }
+
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[][] families, int maxVersions) {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (byte[] family : families) {
+ HColumnDescriptor hcd = new HColumnDescriptor(family)
+ .setMaxVersions(maxVersions);
+ desc.addFamily(hcd);
+ }
+ return desc;
+ }
+
/**
* Create an HRegion that writes to the local tmp dirs
* @param desc
@@ -2221,7 +2237,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
Put put = new Put(row);
put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
for (int i = 0; i < f.length; i++) {
- put.add(f[i], null, value != null ? value : row);
+ put.add(f[i], f[i], value != null ? value : row);
}
puts.add(put);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index fc47ae2..039c781 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -38,13 +37,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.mapreduce.Cluster;
import org.apache.log4j.Level;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@@ -66,8 +66,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -93,11 +96,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -632,6 +638,71 @@ public class TestFromClientSide {
assertEquals(rowCount - endKeyCount, countGreater);
}
+ /**
+ * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
+ * fail with an IOException() on the first call.
+ */
+ public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
+ static AtomicLong reqCount = new AtomicLong(0);
+ class MyStoreScanner extends StoreScanner {
+ public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+ long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ }
+
+ @Override
+ protected List<KeyValueScanner> selectScannersFrom(
+ List<? extends KeyValueScanner> allScanners) {
+ List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
+ List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
+ for (KeyValueScanner scanner : scanners) {
+ newScanners.add(new DelegatingKeyValueScanner(scanner) {
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ if (reqCount.incrementAndGet() == 1) {
+ throw new IOException("Injected exception");
+ }
+ return super.reseek(key);
+ }
+ });
+ }
+ return newScanners;
+ }
+ }
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
+ throws IOException {
+ return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, Long.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek
+ * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the
+ * ClientScanner does not get an exception and also sees all the data.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testClientScannerIsResetWhenScanThrowsIOException()
+ throws IOException, InterruptedException {
+ TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
+ TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException");
+
+ HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
+ htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
+ TEST_UTIL.getHBaseAdmin().createTable(htd);
+ try (Table t = TEST_UTIL.getConnection().getTable(name)) {
+ int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
+ TEST_UTIL.getHBaseAdmin().flush(name);
+ int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY));
+ assertEquals(rowCount, actualRowCount);
+ }
+ assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
+ }
+
/*
* @param key
* @return Scan with RowFilter that does LESS than passed key.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 3e915e1..0e2b670 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -183,7 +183,7 @@ public class TestTableSnapshotScanner {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
index 7d1267a..3df4a8f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -182,7 +182,7 @@ public abstract class TableSnapshotInputFormatTestBase {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
index 9a81990..1cd2432 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
@@ -100,6 +100,7 @@ public class TestMultithreadedTableMapper {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -113,7 +114,7 @@ public class TestMultithreadedTableMapper {
Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
index 6fb9460..fde1cd9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
@@ -51,6 +51,7 @@ import org.junit.experimental.categories.Category;
public class TestTableMapReduce extends TestTableMapReduceBase {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
+ @Override
protected Log getLog() { return LOG; }
/**
@@ -66,6 +67,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -80,7 +82,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
index 022d4c9..9995eb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
@@ -125,7 +125,7 @@ public abstract class TestTableMapReduceBase {
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d600e8b7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
new file mode 100644
index 0000000..10432b9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+public class DelegatingKeyValueScanner implements KeyValueScanner {
+ protected KeyValueScanner delegate;
+
+ public DelegatingKeyValueScanner(KeyValueScanner delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Cell peek() {
+ return delegate.peek();
+ }
+
+ @Override
+ public Cell next() throws IOException {
+ return delegate.next();
+ }
+
+ @Override
+ public boolean seek(Cell key) throws IOException {
+ return delegate.seek(key);
+ }
+
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ return delegate.reseek(key);
+ }
+
+ @Override
+ public long getScannerOrder() {
+ return delegate.getScannerOrder();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
+ return delegate.shouldUseScanner(scan, store, oldestUnexpiredTS);
+ }
+
+ @Override
+ public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException {
+ return delegate.requestSeek(kv, forward, useBloom);
+ }
+
+ @Override
+ public boolean realSeekDone() {
+ return delegate.realSeekDone();
+ }
+
+ @Override
+ public void enforceSeek() throws IOException {
+ delegate.enforceSeek();
+ }
+
+ @Override
+ public boolean isFileScanner() {
+ return delegate.isFileScanner();
+ }
+
+ @Override
+ public boolean backwardSeek(Cell key) throws IOException {
+ return delegate.backwardSeek(key);
+ }
+
+ @Override
+ public boolean seekToPreviousRow(Cell key) throws IOException {
+ return delegate.seekToPreviousRow(key);
+ }
+
+ @Override
+ public boolean seekToLastRow() throws IOException {
+ return delegate.seekToLastRow();
+ }
+
+ @Override
+ public Cell getNextIndexedKey() {
+ return delegate.getNextIndexedKey();
+ }
+}
\ No newline at end of file
[2/3] hbase git commit: HBASE-16604 Scanner retries on IOException
can cause the scans to miss data
Posted by en...@apache.org.
HBASE-16604 Scanner retries on IOException can cause the scans to miss data
Conflicts:
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d307ad19
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d307ad19
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d307ad19
Branch: refs/heads/branch-1.2
Commit: d307ad19fec359fab166f0a6271d6460324f1c16
Parents: 32a7f2c
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Sep 22 17:41:01 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Sep 22 18:28:06 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/UnknownScannerException.java | 4 +
.../hadoop/hbase/client/ClientScanner.java | 4 +-
.../hadoop/hbase/client/ScannerCallable.java | 26 +++--
.../hbase/exceptions/ScannerResetException.java | 50 +++++++++
.../hbase/ipc/MetricsHBaseServerSource.java | 2 +
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 8 ++
.../hadoop/hbase/ipc/MetricsHBaseServer.java | 3 +
.../hbase/regionserver/RSRpcServices.java | 64 +++++++----
.../hadoop/hbase/HBaseTestingUtility.java | 18 ++-
.../hadoop/hbase/client/TestFromClientSide.java | 77 ++++++++++++-
.../hbase/client/TestTableSnapshotScanner.java | 2 +-
.../TableSnapshotInputFormatTestBase.java | 2 +-
.../mapreduce/TestMultithreadedTableMapper.java | 3 +-
.../hbase/mapreduce/TestTableMapReduce.java | 4 +-
.../hbase/mapreduce/TestTableMapReduceBase.java | 2 +-
.../regionserver/DelegatingKeyValueScanner.java | 109 +++++++++++++++++++
16 files changed, 338 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
index b951221..3e7b22d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
@@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException {
public UnknownScannerException(String s) {
super(s);
}
+
+ public UnknownScannerException(String s, Exception e) {
+ super(s, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 1a9df97..944f44e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@@ -433,7 +434,8 @@ public class ClientScanner extends AbstractClientScanner {
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException ||
- e instanceof UnknownScannerException ) {
+ e instanceof UnknownScannerException ||
+ e instanceof ScannerResetException) {
// Pass. It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 5100314..d723370 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -107,9 +108,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
* @param connection which connection
* @param tableName table callable is on
* @param scan the scan to execute
- * @param scanMetrics the ScanMetrics to used, if it is null,
+ * @param scanMetrics the ScanMetrics to used, if it is null,
* ScannerCallable won't collect metrics
- * @param rpcControllerFactory factory to use when creating
+ * @param rpcControllerFactory factory to use when creating
* {@link com.google.protobuf.RpcController}
*/
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
@@ -271,14 +272,19 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
- if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
- try {
- HRegionLocation location =
- getConnection().relocateRegion(getTableName(), scan.getStartRow());
- LOG.info("Scanner=" + scannerId
- + " expired, current region location is " + location.toString());
- } catch (Throwable t) {
- LOG.info("Failed to relocate region", t);
+ if (logScannerActivity) {
+ if (ioe instanceof UnknownScannerException) {
+ try {
+ HRegionLocation location =
+ getConnection().relocateRegion(getTableName(), scan.getStartRow());
+ LOG.info("Scanner=" + scannerId
+ + " expired, current region location is " + location.toString());
+ } catch (Throwable t) {
+ LOG.info("Failed to relocate region", t);
+ }
+ } else if (ioe instanceof ScannerResetException) {
+ LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+ + "asked us to reset the scanner state.", ioe);
}
}
// The below convertion of exceptions into DoNotRetryExceptions is a little strange.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
new file mode 100644
index 0000000..7689eb1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when the server side has received an Exception, and asks the Client to reset the scanner
+ * state by closing the current region scanner, and reopening from the start of last seen row.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ScannerResetException extends DoNotRetryIOException {
+ private static final long serialVersionUID = -5649728171144849619L;
+
+ /** constructor */
+ public ScannerResetException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public ScannerResetException(String s) {
+ super(s);
+ }
+
+ public ScannerResetException(String s, Exception e) {
+ super(s, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 061a672..80ce984 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -71,6 +71,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException";
String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException";
String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException";
+ String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException";
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
@@ -98,6 +99,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void movedRegionException();
void notServingRegionException();
void unknownScannerException();
+ void scannerResetException();
void tooBusyException();
void multiActionTooLargeException();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index ac83588..566c25a 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -45,6 +45,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableCounterLong exceptionsOOO;
private final MutableCounterLong exceptionsBusy;
private final MutableCounterLong exceptionsUnknown;
+ private final MutableCounterLong exceptionsScannerReset;
private final MutableCounterLong exceptionsSanity;
private final MutableCounterLong exceptionsNSRE;
private final MutableCounterLong exceptionsMoved;
@@ -77,6 +78,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsUnknown = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+ this.exceptionsScannerReset = this.getMetricsRegistry()
+ .newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsSanity = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsMoved = this.getMetricsRegistry()
@@ -159,6 +162,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
}
@Override
+ public void scannerResetException() {
+ exceptionsScannerReset.incr();
+ }
+
+ @Override
public void tooBusyException() {
exceptionsBusy.incr();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index e514f5f..47642a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@InterfaceAudience.Private
public class MetricsHBaseServer {
@@ -102,6 +103,8 @@ public class MetricsHBaseServer {
source.tooBusyException();
} else if (throwable instanceof UnknownScannerException) {
source.unknownScannerException();
+ } else if (throwable instanceof ScannerResetException) {
+ source.scannerResetException();
} else if (throwable instanceof RegionMovedException) {
source.movedRegionException();
} else if (throwable instanceof NotServingRegionException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 52d28e5..0d79ca9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -2640,13 +2642,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
} catch (IOException e) {
- // if we have an exception on scanner next and we are using the callSeq
- // we should rollback because the client will retry with the same callSeq
- // and get an OutOfOrderScannerNextException if we don't do so.
- if (rsh != null && request.hasNextCallSeq()) {
- rsh.rollbackNextCallSeq();
+ // The scanner state might be left in a dirty state, so we will tell the Client to
+ // fail this RPC and close the scanner while opening up another one from the start of
+ // row that the client has last seen.
+ closeScanner(region, scanner, scannerName);
+
+ // We closed the scanner already. Instead of throwing the IOException, and client
+ // retrying with the same scannerId only to get USE on the next RPC, we directly throw
+ // a special exception to save an RPC.
+ if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
+ // 1.4.0+ clients know how to handle
+ throw new ScannerResetException("Scanner is closed on the server-side", e);
+ } else {
+ // older clients do not know about SRE. Just throw USE, which they will handle
+ throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ + " scanner state for clients older than 1.3.", e);
}
- throw e;
} finally {
// We're done. On way out re-add the above removed lease.
// Adding resets expiration time on lease.
@@ -2660,20 +2671,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
- if (region != null && region.getCoprocessorHost() != null) {
- if (region.getCoprocessorHost().preScannerClose(scanner)) {
- return builder.build(); // bypass
- }
- }
- rsh = scanners.remove(scannerName);
- if (rsh != null) {
- scanner = rsh.s;
- scanner.close();
- regionServer.leases.cancelLease(scannerName);
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerClose(scanner);
- }
- }
+ closeScanner(region, scanner, scannerName);
}
if (ttl > 0) {
@@ -2704,6 +2702,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
+ throws IOException {
+ if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost().preScannerClose(scanner)) {
+ return true; // bypass
+ }
+ }
+ RegionScannerHolder rsh = scanners.remove(scannerName);
+ if (rsh != null) {
+ scanner = rsh.s;
+ scanner.close();
+ try {
+ regionServer.leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ // No problem, ignore
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
+ }
+ }
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(scanner);
+ }
+ }
+ return false;
+ }
+
@Override
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 54b79c3..e15a9f6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1972,6 +1972,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return HRegion.createHRegion(info, rootDir, conf, htd);
}
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[] family) {
+ return createTableDescriptor(tableName, new byte[][] {family}, 1);
+ }
+
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[][] families, int maxVersions) {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (byte[] family : families) {
+ HColumnDescriptor hcd = new HColumnDescriptor(family)
+ .setMaxVersions(maxVersions);
+ desc.addFamily(hcd);
+ }
+ return desc;
+ }
+
/**
* Create an HRegion that writes to the local tmp dirs
* @param desc
@@ -2186,7 +2202,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
Put put = new Put(row);
put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
for (int i = 0; i < f.length; i++) {
- put.add(f[i], null, value != null ? value : row);
+ put.add(f[i], f[i], value != null ? value : row);
}
puts.add(put);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index c801839..0b2ba83 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -37,13 +36,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.mapreduce.Cluster;
import org.apache.log4j.Level;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@@ -65,8 +65,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -92,11 +95,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -587,6 +593,71 @@ public class TestFromClientSide {
assertEquals(rowCount - endKeyCount, countGreater);
}
+ /**
+ * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
+ * fail with an IOException() on the first call.
+ */
+ public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
+ static AtomicLong reqCount = new AtomicLong(0);
+ class MyStoreScanner extends StoreScanner {
+ public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+ long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ }
+
+ @Override
+ protected List<KeyValueScanner> selectScannersFrom(
+ List<? extends KeyValueScanner> allScanners) {
+ List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
+ List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
+ for (KeyValueScanner scanner : scanners) {
+ newScanners.add(new DelegatingKeyValueScanner(scanner) {
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ if (reqCount.incrementAndGet() == 1) {
+ throw new IOException("Injected exception");
+ }
+ return super.reseek(key);
+ }
+ });
+ }
+ return newScanners;
+ }
+ }
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
+ throws IOException {
+ return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, Long.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek
+ * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the
+ * ClientScanner does not get an exception and also sees all the data.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testClientScannerIsResetWhenScanThrowsIOException()
+ throws IOException, InterruptedException {
+ TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
+ TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException");
+
+ HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
+ htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
+ TEST_UTIL.getHBaseAdmin().createTable(htd);
+ try (Table t = TEST_UTIL.getConnection().getTable(name)) {
+ int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
+ TEST_UTIL.getHBaseAdmin().flush(name);
+ int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY));
+ assertEquals(rowCount, actualRowCount);
+ }
+ assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
+ }
+
/*
* @param key
* @return Scan with RowFilter that does LESS than passed key.
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 3e915e1..0e2b670 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -183,7 +183,7 @@ public class TestTableSnapshotScanner {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
index 7d1267a..3df4a8f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -182,7 +182,7 @@ public abstract class TableSnapshotInputFormatTestBase {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
index 9a81990..1cd2432 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
@@ -100,6 +100,7 @@ public class TestMultithreadedTableMapper {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -113,7 +114,7 @@ public class TestMultithreadedTableMapper {
Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
index 6fb9460..fde1cd9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
@@ -51,6 +51,7 @@ import org.junit.experimental.categories.Category;
public class TestTableMapReduce extends TestTableMapReduceBase {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
+ @Override
protected Log getLog() { return LOG; }
/**
@@ -66,6 +67,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -80,7 +82,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
index 022d4c9..9995eb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
@@ -125,7 +125,7 @@ public abstract class TestTableMapReduceBase {
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d307ad19/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
new file mode 100644
index 0000000..0711c62
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+public class DelegatingKeyValueScanner implements KeyValueScanner {
+ protected KeyValueScanner delegate;
+
+ public DelegatingKeyValueScanner(KeyValueScanner delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Cell peek() {
+ return delegate.peek();
+ }
+
+ @Override
+ public Cell next() throws IOException {
+ return delegate.next();
+ }
+
+ @Override
+ public boolean seek(Cell key) throws IOException {
+ return delegate.seek(key);
+ }
+
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ return delegate.reseek(key);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
+ return delegate.shouldUseScanner(scan, store, oldestUnexpiredTS);
+ }
+
+ @Override
+ public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException {
+ return delegate.requestSeek(kv, forward, useBloom);
+ }
+
+ @Override
+ public long getSequenceID() {
+ return delegate.getSequenceID();
+ }
+
+ @Override
+ public boolean realSeekDone() {
+ return delegate.realSeekDone();
+ }
+
+ @Override
+ public void enforceSeek() throws IOException {
+ delegate.enforceSeek();
+ }
+
+ @Override
+ public boolean isFileScanner() {
+ return delegate.isFileScanner();
+ }
+
+ @Override
+ public boolean backwardSeek(Cell key) throws IOException {
+ return delegate.backwardSeek(key);
+ }
+
+ @Override
+ public boolean seekToPreviousRow(Cell key) throws IOException {
+ return delegate.seekToPreviousRow(key);
+ }
+
+ @Override
+ public boolean seekToLastRow() throws IOException {
+ return delegate.seekToLastRow();
+ }
+
+ @Override
+ public Cell getNextIndexedKey() {
+ return delegate.getNextIndexedKey();
+ }
+}
[3/3] hbase git commit: HBASE-16604 Scanner retries on IOException
can cause the scans to miss data
Posted by en...@apache.org.
HBASE-16604 Scanner retries on IOException can cause the scans to miss data
Conflicts:
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9d424c20
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9d424c20
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9d424c20
Branch: refs/heads/branch-1.1
Commit: 9d424c20c1caabcabf874a3d4f6b774e83886b57
Parents: 97ce640
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Sep 22 17:41:01 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Sep 22 18:44:19 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/UnknownScannerException.java | 4 +
.../hadoop/hbase/client/ClientScanner.java | 4 +-
.../hadoop/hbase/client/ScannerCallable.java | 26 +++--
.../hbase/exceptions/ScannerResetException.java | 50 +++++++++
.../hbase/ipc/MetricsHBaseServerSource.java | 2 +
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 8 ++
.../hadoop/hbase/client/VersionInfoUtil.java | 63 +++++++++++
.../hadoop/hbase/ipc/MetricsHBaseServer.java | 3 +
.../hbase/regionserver/RSRpcServices.java | 65 +++++++----
.../hadoop/hbase/HBaseTestingUtility.java | 18 ++-
.../hadoop/hbase/client/TestFromClientSide.java | 77 ++++++++++++-
.../hbase/client/TestTableSnapshotScanner.java | 2 +-
.../TableSnapshotInputFormatTestBase.java | 2 +-
.../mapreduce/TestMultithreadedTableMapper.java | 3 +-
.../hbase/mapreduce/TestTableMapReduce.java | 4 +-
.../hbase/mapreduce/TestTableMapReduceBase.java | 2 +-
.../regionserver/DelegatingKeyValueScanner.java | 109 +++++++++++++++++++
17 files changed, 402 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
index b951221..3e7b22d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
@@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException {
public UnknownScannerException(String s) {
super(s);
}
+
+ public UnknownScannerException(String s, Exception e) {
+ super(s, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index c504d9b..4a5e635 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@@ -433,7 +434,8 @@ public class ClientScanner extends AbstractClientScanner {
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException ||
- e instanceof UnknownScannerException ) {
+ e instanceof UnknownScannerException ||
+ e instanceof ScannerResetException) {
// Pass. It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index fc21efa..0a22f37 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -104,9 +105,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
* @param connection which connection
* @param tableName table callable is on
* @param scan the scan to execute
- * @param scanMetrics the ScanMetrics to used, if it is null,
+ * @param scanMetrics the ScanMetrics to used, if it is null,
* ScannerCallable won't collect metrics
- * @param rpcControllerFactory factory to use when creating
+ * @param rpcControllerFactory factory to use when creating
* {@link com.google.protobuf.RpcController}
*/
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
@@ -265,14 +266,19 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
- if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
- try {
- HRegionLocation location =
- getConnection().relocateRegion(getTableName(), scan.getStartRow());
- LOG.info("Scanner=" + scannerId
- + " expired, current region location is " + location.toString());
- } catch (Throwable t) {
- LOG.info("Failed to relocate region", t);
+ if (logScannerActivity) {
+ if (ioe instanceof UnknownScannerException) {
+ try {
+ HRegionLocation location =
+ getConnection().relocateRegion(getTableName(), scan.getStartRow());
+ LOG.info("Scanner=" + scannerId
+ + " expired, current region location is " + location.toString());
+ } catch (Throwable t) {
+ LOG.info("Failed to relocate region", t);
+ }
+ } else if (ioe instanceof ScannerResetException) {
+ LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+ + "asked us to reset the scanner state.", ioe);
}
}
// The below convertion of exceptions into DoNotRetryExceptions is a little strange.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
new file mode 100644
index 0000000..7689eb1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when the server side has received an Exception, and asks the Client to reset the scanner
+ * state by closing the current region scanner, and reopening from the start of last seen row.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ScannerResetException extends DoNotRetryIOException {
+ private static final long serialVersionUID = -5649728171144849619L;
+
+ /** constructor */
+ public ScannerResetException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public ScannerResetException(String s) {
+ super(s);
+ }
+
+ public ScannerResetException(String s, Exception e) {
+ super(s, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 482fdba..6676d93 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -64,6 +64,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException";
String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException";
String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException";
+ String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException";
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
@@ -86,6 +87,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void movedRegionException();
void notServingRegionException();
void unknownScannerException();
+ void scannerResetException();
void tooBusyException();
void sentBytes(long count);
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 18226df..676cf36 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -43,6 +43,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableCounterLong exceptionsOOO;
private final MutableCounterLong exceptionsBusy;
private final MutableCounterLong exceptionsUnknown;
+ private final MutableCounterLong exceptionsScannerReset;
private final MutableCounterLong exceptionsSanity;
private final MutableCounterLong exceptionsNSRE;
private final MutableCounterLong exceptionsMoved;
@@ -72,6 +73,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsUnknown = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+ this.exceptionsScannerReset = this.getMetricsRegistry()
+ .newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsSanity = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsMoved = this.getMetricsRegistry()
@@ -141,6 +144,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
}
@Override
+ public void scannerResetException() {
+ exceptionsScannerReset.incr();
+ }
+
+ @Override
public void tooBusyException() {
exceptionsBusy.incr();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
new file mode 100644
index 0000000..f11fd9e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+
+
+/**
+ * Class to help with parsing the version info.
+ */
+@InterfaceAudience.Private
+public final class VersionInfoUtil {
+
+ private VersionInfoUtil() {
+ /* UTIL CLASS ONLY */
+ }
+
+ public static boolean currentClientHasMinimumVersion(int major, int minor) {
+ RpcCallContext call = RpcServer.getCurrentCall();
+ RPCProtos.VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null;
+ return hasMinimumVersion(versionInfo, major, minor);
+ }
+
+ public static boolean hasMinimumVersion(RPCProtos.VersionInfo versionInfo,
+ int major,
+ int minor) {
+ if (versionInfo != null) {
+ try {
+ String[] components = versionInfo.getVersion().split("\\.");
+
+ int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0;
+ if (clientMajor != major) {
+ return clientMajor > major;
+ }
+
+ int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0;
+ return clientMinor >= minor;
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index 3ca50ad..cdd607f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@InterfaceAudience.Private
public class MetricsHBaseServer {
@@ -91,6 +92,8 @@ public class MetricsHBaseServer {
source.tooBusyException();
} else if (throwable instanceof UnknownScannerException) {
source.unknownScannerException();
+ } else if (throwable instanceof ScannerResetException) {
+ source.scannerResetException();
} else if (throwable instanceof RegionMovedException) {
source.movedRegionException();
} else if (throwable instanceof NotServingRegionException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 3217b98..5b087a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -65,12 +65,14 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -2517,13 +2519,23 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
} catch (IOException e) {
- // if we have an exception on scanner next and we are using the callSeq
- // we should rollback because the client will retry with the same callSeq
- // and get an OutOfOrderScannerNextException if we don't do so.
- if (rsh != null && request.hasNextCallSeq()) {
- rsh.rollbackNextCallSeq();
+ // The scanner state might be left in a dirty state, so we will tell the Client to
+ // fail this RPC and close the scanner while opening up another one from the start of
+ // row that the client has last seen.
+ closeScanner(region, scanner, scannerName);
+
+ // We closed the scanner already. Instead of throwing the IOException, and client
+ // retrying with the same scannerId only to get USE on the next RPC, we directly throw
+ // a special exception to save an RPC.
+ RpcCallContext context = RpcServer.getCurrentCall();
+ if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
+ // 1.4.0+ clients know how to handle
+ throw new ScannerResetException("Scanner is closed on the server-side", e);
+ } else {
+ // older clients do not know about SRE. Just throw USE, which they will handle
+ throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ + " scanner state for clients older than 1.3.", e);
}
- throw e;
} finally {
// We're done. On way out re-add the above removed lease.
// Adding resets expiration time on lease.
@@ -2537,20 +2549,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
- if (region != null && region.getCoprocessorHost() != null) {
- if (region.getCoprocessorHost().preScannerClose(scanner)) {
- return builder.build(); // bypass
- }
- }
- rsh = scanners.remove(scannerName);
- if (rsh != null) {
- scanner = rsh.s;
- scanner.close();
- regionServer.leases.cancelLease(scannerName);
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerClose(scanner);
- }
- }
+ closeScanner(region, scanner, scannerName);
}
if (ttl > 0) {
@@ -2581,6 +2580,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
+ throws IOException {
+ if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost().preScannerClose(scanner)) {
+ return true; // bypass
+ }
+ }
+ RegionScannerHolder rsh = scanners.remove(scannerName);
+ if (rsh != null) {
+ scanner = rsh.s;
+ scanner.close();
+ try {
+ regionServer.leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ // No problem, ignore
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
+ }
+ }
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(scanner);
+ }
+ }
+ return false;
+ }
+
@Override
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 44ec24a..b06e489 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1953,6 +1953,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return HRegion.createHRegion(info, rootDir, conf, htd);
}
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[] family) {
+ return createTableDescriptor(tableName, new byte[][] {family}, 1);
+ }
+
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[][] families, int maxVersions) {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (byte[] family : families) {
+ HColumnDescriptor hcd = new HColumnDescriptor(family)
+ .setMaxVersions(maxVersions);
+ desc.addFamily(hcd);
+ }
+ return desc;
+ }
+
/**
* Create an HRegion that writes to the local tmp dirs
* @param desc
@@ -2164,7 +2180,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
Put put = new Put(row);
put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
for (int i = 0; i < f.length; i++) {
- put.add(f[i], null, value != null ? value : row);
+ put.add(f[i], f[i], value != null ? value : row);
}
puts.add(put);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 5aba902..aa8936c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -37,13 +36,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.mapreduce.Cluster;
import org.apache.log4j.Level;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@@ -65,8 +65,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -92,11 +95,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -588,6 +594,71 @@ public class TestFromClientSide {
assertEquals(rowCount - endKeyCount, countGreater);
}
+ /**
+ * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
+ * fail with an IOException() on the first call.
+ */
+ public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
+ static AtomicLong reqCount = new AtomicLong(0);
+ class MyStoreScanner extends StoreScanner {
+ public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+ long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ }
+
+ @Override
+ protected List<KeyValueScanner> selectScannersFrom(
+ List<? extends KeyValueScanner> allScanners) {
+ List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
+ List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
+ for (KeyValueScanner scanner : scanners) {
+ newScanners.add(new DelegatingKeyValueScanner(scanner) {
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ if (reqCount.incrementAndGet() == 1) {
+ throw new IOException("Injected exception");
+ }
+ return super.reseek(key);
+ }
+ });
+ }
+ return newScanners;
+ }
+ }
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
+ throws IOException {
+ return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, Long.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek
+ * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the
+ * ClientScanner does not get an exception and also sees all the data.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testClientScannerIsResetWhenScanThrowsIOException()
+ throws IOException, InterruptedException {
+ TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
+ TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException");
+
+ HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
+ htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
+ TEST_UTIL.getHBaseAdmin().createTable(htd);
+ try (Table t = TEST_UTIL.getConnection().getTable(name)) {
+ int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
+ TEST_UTIL.getHBaseAdmin().flush(name);
+ int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY));
+ assertEquals(rowCount, actualRowCount);
+ }
+ assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
+ }
+
/*
* @param key
* @return Scan with RowFilter that does LESS than passed key.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 3e915e1..0e2b670 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -183,7 +183,7 @@ public class TestTableSnapshotScanner {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
index b35a5d3..190c245 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -180,7 +180,7 @@ public abstract class TableSnapshotInputFormatTestBase {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
index 7ce4a63..fe3d9d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
@@ -98,6 +98,7 @@ public class TestMultithreadedTableMapper {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -111,7 +112,7 @@ public class TestMultithreadedTableMapper {
Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
index 6fb9460..fde1cd9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
@@ -51,6 +51,7 @@ import org.junit.experimental.categories.Category;
public class TestTableMapReduce extends TestTableMapReduceBase {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
+ @Override
protected Log getLog() { return LOG; }
/**
@@ -66,6 +67,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -80,7 +82,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
index 084b80f..1e2efb6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
@@ -122,7 +122,7 @@ public abstract class TestTableMapReduceBase {
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
new file mode 100644
index 0000000..c680654
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+public class DelegatingKeyValueScanner implements KeyValueScanner {
+ protected KeyValueScanner delegate;
+
+ public DelegatingKeyValueScanner(KeyValueScanner delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Cell peek() {
+ return delegate.peek();
+ }
+
+ @Override
+ public Cell next() throws IOException {
+ return delegate.next();
+ }
+
+ @Override
+ public boolean seek(Cell key) throws IOException {
+ return delegate.seek(key);
+ }
+
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ return delegate.reseek(key);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
+ return delegate.shouldUseScanner(scan, columns, oldestUnexpiredTS);
+ }
+
+ @Override
+ public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException {
+ return delegate.requestSeek(kv, forward, useBloom);
+ }
+
+ @Override
+ public long getSequenceID() {
+ return delegate.getSequenceID();
+ }
+
+ @Override
+ public boolean realSeekDone() {
+ return delegate.realSeekDone();
+ }
+
+ @Override
+ public void enforceSeek() throws IOException {
+ delegate.enforceSeek();
+ }
+
+ @Override
+ public boolean isFileScanner() {
+ return delegate.isFileScanner();
+ }
+
+ @Override
+ public boolean backwardSeek(Cell key) throws IOException {
+ return delegate.backwardSeek(key);
+ }
+
+ @Override
+ public boolean seekToPreviousRow(Cell key) throws IOException {
+ return delegate.seekToPreviousRow(key);
+ }
+
+ @Override
+ public boolean seekToLastRow() throws IOException {
+ return delegate.seekToLastRow();
+ }
+
+ @Override
+ public Cell getNextIndexedKey() {
+ return delegate.getNextIndexedKey();
+ }
+}