You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/09/23 18:37:10 UTC
[2/2] hbase git commit: HBASE-16604 Scanner retries on IOException
can cause the scans to miss data - RECOMMIT after revert
HBASE-16604 Scanner retries on IOException can cause the scans to miss data - RECOMMIT after revert
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eb112783
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eb112783
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eb112783
Branch: refs/heads/master
Commit: eb112783ae11dbb23f74f453f22a0301c6b2b11f
Parents: 39db0ca
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Sep 23 11:27:13 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Sep 23 11:27:13 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/UnknownScannerException.java | 4 +
.../hadoop/hbase/client/ClientScanner.java | 4 +-
.../hadoop/hbase/client/ScannerCallable.java | 26 +++--
.../hbase/exceptions/ScannerResetException.java | 50 ++++++++
.../hbase/ipc/MetricsHBaseServerSource.java | 2 +
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 8 ++
.../hadoop/hbase/ipc/MetricsHBaseServer.java | 3 +
.../hbase/regionserver/RSRpcServices.java | 76 ++++++++-----
.../hadoop/hbase/HBaseTestingUtility.java | 27 +++--
.../hadoop/hbase/client/TestFromClientSide.java | 75 +++++++++++-
.../hbase/client/TestTableSnapshotScanner.java | 2 +-
.../TableSnapshotInputFormatTestBase.java | 2 +-
.../mapreduce/TestMultithreadedTableMapper.java | 3 +-
.../hbase/mapreduce/TestTableMapReduce.java | 5 +-
.../hbase/mapreduce/TestTableMapReduceBase.java | 2 +-
.../regionserver/DelegatingKeyValueScanner.java | 114 +++++++++++++++++++
16 files changed, 350 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
index b951221..3e7b22d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
@@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException {
public UnknownScannerException(String s) {
super(s);
}
+
+ public UnknownScannerException(String s, Exception e) {
+ super(s, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 3e676c7..de8bfcc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@@ -428,7 +429,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException ||
- e instanceof UnknownScannerException ) {
+ e instanceof UnknownScannerException ||
+ e instanceof ScannerResetException) {
// Pass. It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index adf1153..8345aa1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -102,7 +102,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
* @param scan the scan to execute
* @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
* metrics
- * @param rpcControllerFactory factory to use when creating
+ * @param rpcControllerFactory factory to use when creating
* {@link com.google.protobuf.RpcController}
*/
public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
@@ -174,6 +174,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
}
}
+ @Override
protected Result [] rpcCall() throws Exception {
if (Thread.interrupted()) {
throw new InterruptedIOException();
@@ -245,14 +246,19 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (e instanceof RemoteException) {
ioe = ((RemoteException) e).unwrapRemoteException();
}
- if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
- try {
- HRegionLocation location =
- getConnection().relocateRegion(getTableName(), scan.getStartRow());
- LOG.info("Scanner=" + scannerId + " expired, current region location is " +
- location.toString());
- } catch (Throwable t) {
- LOG.info("Failed to relocate region", t);
+ if (logScannerActivity) {
+ if (ioe instanceof UnknownScannerException) {
+ try {
+ HRegionLocation location =
+ getConnection().relocateRegion(getTableName(), scan.getStartRow());
+ LOG.info("Scanner=" + scannerId + " expired, current region location is " +
+ location.toString());
+ } catch (Throwable t) {
+ LOG.info("Failed to relocate region", t);
+ }
+ } else if (ioe instanceof ScannerResetException) {
+ LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+ + "asked us to reset the scanner state.", ioe);
}
}
// The below convertion of exceptions into DoNotRetryExceptions is a little strange.
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
new file mode 100644
index 0000000..7689eb1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when the server side has received an Exception, and asks the Client to reset the scanner
+ * state by closing the current region scanner, and reopening from the start of last seen row.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ScannerResetException extends DoNotRetryIOException {
+ private static final long serialVersionUID = -5649728171144849619L;
+
+ /** constructor */
+ public ScannerResetException() {
+ super();
+ }
+
+ /**
+ * Constructor
+ * @param s message
+ */
+ public ScannerResetException(String s) {
+ super(s);
+ }
+
+ public ScannerResetException(String s, Exception e) {
+ super(s, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index ffbe6fe..cf9c6c7 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -79,6 +79,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException";
String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException";
String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException";
+ String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException";
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
@@ -108,6 +109,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void movedRegionException();
void notServingRegionException();
void unknownScannerException();
+ void scannerResetException();
void tooBusyException();
void multiActionTooLargeException();
void callQueueTooBigException();
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index fafa9d0..d372b1b 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -45,6 +45,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableFastCounter exceptionsOOO;
private final MutableFastCounter exceptionsBusy;
private final MutableFastCounter exceptionsUnknown;
+ private final MutableFastCounter exceptionsScannerReset;
private final MutableFastCounter exceptionsSanity;
private final MutableFastCounter exceptionsNSRE;
private final MutableFastCounter exceptionsMoved;
@@ -78,6 +79,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsUnknown = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+ this.exceptionsScannerReset = this.getMetricsRegistry()
+ .newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsSanity = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsMoved = this.getMetricsRegistry()
@@ -162,6 +165,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
}
@Override
+ public void scannerResetException() {
+ exceptionsScannerReset.incr();
+ }
+
+ @Override
public void tooBusyException() {
exceptionsBusy.incr();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index 838bdf6..fe03d4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@InterfaceAudience.Private
public class MetricsHBaseServer {
@@ -103,6 +104,8 @@ public class MetricsHBaseServer {
source.tooBusyException();
} else if (throwable instanceof UnknownScannerException) {
source.unknownScannerException();
+ } else if (throwable instanceof ScannerResetException) {
+ source.scannerResetException();
} else if (throwable instanceof RegionMovedException) {
source.movedRegionException();
} else if (throwable instanceof NotServingRegionException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 6f92f9d..5ba8afd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -2901,13 +2902,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
isClientCellBlockSupport(context));
}
} catch (IOException e) {
- // if we have an exception on scanner next and we are using the callSeq
- // we should rollback because the client will retry with the same callSeq
- // and get an OutOfOrderScannerNextException if we don't do so.
- if (rsh != null && request.hasNextCallSeq()) {
- rsh.rollbackNextCallSeq();
+ // The scanner state might be left in a dirty state, so we will tell the Client to
+ // fail this RPC and close the scanner while opening up another one from the start of
+ // row that the client has last seen.
+ closeScanner(region, scanner, scannerName, context);
+
+ // We closed the scanner already. Instead of throwing the IOException, and client
+ // retrying with the same scannerId only to get USE on the next RPC, we directly throw
+ // a special exception to save an RPC.
+ if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
+ // 1.4.0+ clients know how to handle
+ throw new ScannerResetException("Scanner is closed on the server-side", e);
+ } else {
+ // older clients do not know about SRE. Just throw USE, which they will handle
+ throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ + " scanner state for clients older than 1.3.", e);
}
- throw e;
} finally {
if (context != null) {
context.setCallBack(rsh.shippedCallback);
@@ -2926,29 +2936,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
- if (region != null && region.getCoprocessorHost() != null) {
- if (region.getCoprocessorHost().preScannerClose(scanner)) {
- return builder.build(); // bypass
- }
- }
- rsh = scanners.remove(scannerName);
- if (rsh != null) {
- if (context != null) {
- context.setCallBack(rsh.closeCallBack);
- } else {
- rsh.s.close();
- }
- try {
- regionServer.leases.cancelLease(scannerName);
- } catch (LeaseException le) {
- // No problem, ignore
- if (LOG.isTraceEnabled()) {
- LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
- }
- }
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerClose(scanner);
- }
+ if (closeScanner(region, scanner, scannerName, context)) {
+ return builder.build(); // bypass
}
}
@@ -2980,6 +2969,35 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ private boolean closeScanner(Region region, RegionScanner scanner, String scannerName,
+ RpcCallContext context) throws IOException {
+ if (region != null && region.getCoprocessorHost() != null) {
+ if (region.getCoprocessorHost().preScannerClose(scanner)) {
+ return true; // bypass
+ }
+ }
+ RegionScannerHolder rsh = scanners.remove(scannerName);
+ if (rsh != null) {
+ if (context != null) {
+ context.setCallBack(rsh.closeCallBack);
+ } else {
+ rsh.s.close();
+ }
+ try {
+ regionServer.leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ // No problem, ignore
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
+ }
+ }
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().postScannerClose(scanner);
+ }
+ }
+ return false;
+ }
+
@Override
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 5a4da45..829661c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1413,12 +1413,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public Table createTable(TableName tableName, byte[][] families,
int numVersions, byte[] startKey, byte[] endKey, int numRegions)
throws IOException{
- HTableDescriptor desc = new HTableDescriptor(tableName);
- for (byte[] family : families) {
- HColumnDescriptor hcd = new HColumnDescriptor(family)
- .setMaxVersions(numVersions);
- desc.addFamily(hcd);
- }
+ HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
+
getAdmin().createTable(desc, startKey, endKey, numRegions);
// HBaseAdmin only waits for regions to appear in hbase:meta we
// should wait until they are assigned
@@ -1781,6 +1777,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
}
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[] family) {
+ return createTableDescriptor(tableName, new byte[][] {family}, 1);
+ }
+
+ public HTableDescriptor createTableDescriptor(final TableName tableName,
+ byte[][] families, int maxVersions) {
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (byte[] family : families) {
+ HColumnDescriptor hcd = new HColumnDescriptor(family)
+ .setMaxVersions(maxVersions);
+ desc.addFamily(hcd);
+ }
+ return desc;
+ }
+
/**
* Create an HRegion that writes to the local tmp dirs
* @param desc
@@ -1998,7 +2010,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
for (int i = 0; i < f.length; i++) {
byte[] value1 = value != null ? value : row;
- put.addColumn(f[i], null, value1);
+ put.addColumn(f[i], f[i], value1);
}
puts.add(put);
}
@@ -3540,6 +3552,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public PortAllocator(Random random) {
this.random = random;
this.portChecker = new AvailablePortChecker() {
+ @Override
public boolean available(int port) {
try {
ServerSocket sock = new ServerSocket(port);
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index f465625..33a5315 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,10 +36,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.ArrayUtils;
@@ -63,8 +64,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -91,10 +95,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
+import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -536,6 +544,71 @@ public class TestFromClientSide {
assertEquals(rowCount - endKeyCount, countGreater);
}
+ /**
+ * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
+ * fail with an IOException() on the first call.
+ */
+ public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
+ static AtomicLong reqCount = new AtomicLong(0);
+ class MyStoreScanner extends StoreScanner {
+ public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
+ long readPt) throws IOException {
+ super(store, scanInfo, scan, columns, readPt);
+ }
+
+ @Override
+ protected List<KeyValueScanner> selectScannersFrom(
+ List<? extends KeyValueScanner> allScanners) {
+ List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
+ List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
+ for (KeyValueScanner scanner : scanners) {
+ newScanners.add(new DelegatingKeyValueScanner(scanner) {
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ if (reqCount.incrementAndGet() == 1) {
+ throw new IOException("Injected exception");
+ }
+ return super.reseek(key);
+ }
+ });
+ }
+ return newScanners;
+ }
+ }
+
+ @Override
+ public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s,
+ final long readPt) throws IOException {
+ return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, readPt);
+ }
+ }
+
+ /**
+ * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek
+ * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the
+ * ClientScanner does not get an exception and also sees all the data.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testClientScannerIsResetWhenScanThrowsIOException()
+ throws IOException, InterruptedException {
+ TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
+ TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException");
+
+ HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
+ htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
+ TEST_UTIL.getAdmin().createTable(htd);
+ try (Table t = TEST_UTIL.getConnection().getTable(name)) {
+ int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
+ TEST_UTIL.getAdmin().flush(name);
+ int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY));
+ assertEquals(rowCount, actualRowCount);
+ }
+ assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
+ }
+
/*
* @param key
* @return Scan with RowFilter that does LESS than passed key.
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 0f0baff..8b9428f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -184,7 +184,7 @@ public class TestTableSnapshotScanner {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
index 26e5897..66d290a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -181,7 +181,7 @@ public abstract class TableSnapshotInputFormatTestBase {
}
for (int j = 0; j < FAMILIES.length; j++) {
- byte[] actual = result.getValue(FAMILIES[j], null);
+ byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
index 5110ef7..694a359 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
@@ -97,6 +97,7 @@ public class TestMultithreadedTableMapper {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -110,7 +111,7 @@ public class TestMultithreadedTableMapper {
Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
index fa5b9a4..690e776 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
@@ -57,6 +57,7 @@ import org.junit.experimental.categories.Category;
public class TestTableMapReduce extends TestTableMapReduceBase {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
+ @Override
protected Log getLog() { return LOG; }
/**
@@ -72,6 +73,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
* @param context
* @throws IOException
*/
+ @Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@@ -86,7 +88,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
}
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
@@ -96,6 +98,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
}
}
+ @Override
protected void runTestOnTable(Table table) throws IOException {
Job job = null;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
index e78bf4f..27bf063 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
@@ -126,7 +126,7 @@ public abstract class TestTableMapReduceBase {
// Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
http://git-wip-us.apache.org/repos/asf/hbase/blob/eb112783/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
new file mode 100644
index 0000000..51a2a97
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+public class DelegatingKeyValueScanner implements KeyValueScanner {
+ protected KeyValueScanner delegate;
+
+ public DelegatingKeyValueScanner(KeyValueScanner delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void shipped() throws IOException {
+ delegate.shipped();
+ }
+
+ @Override
+ public Cell peek() {
+ return delegate.peek();
+ }
+
+ @Override
+ public Cell next() throws IOException {
+ return delegate.next();
+ }
+
+ @Override
+ public boolean seek(Cell key) throws IOException {
+ return delegate.seek(key);
+ }
+
+ @Override
+ public boolean reseek(Cell key) throws IOException {
+ return delegate.reseek(key);
+ }
+
+ @Override
+ public long getScannerOrder() {
+ return delegate.getScannerOrder();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ @Override
+ public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
+ return delegate.shouldUseScanner(scan, store, oldestUnexpiredTS);
+ }
+
+ @Override
+ public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException {
+ return delegate.requestSeek(kv, forward, useBloom);
+ }
+
+ @Override
+ public boolean realSeekDone() {
+ return delegate.realSeekDone();
+ }
+
+ @Override
+ public void enforceSeek() throws IOException {
+ delegate.enforceSeek();
+ }
+
+ @Override
+ public boolean isFileScanner() {
+ return delegate.isFileScanner();
+ }
+
+ @Override
+ public boolean backwardSeek(Cell key) throws IOException {
+ return delegate.backwardSeek(key);
+ }
+
+ @Override
+ public boolean seekToPreviousRow(Cell key) throws IOException {
+ return delegate.seekToPreviousRow(key);
+ }
+
+ @Override
+ public boolean seekToLastRow() throws IOException {
+ return delegate.seekToLastRow();
+ }
+
+ @Override
+ public Cell getNextIndexedKey() {
+ return delegate.getNextIndexedKey();
+ }
+}
\ No newline at end of file