You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/12/11 03:34:37 UTC
hbase git commit: HBASE-14946 Don't allow multi's to over run the max
result size.
Repository: hbase
Updated Branches:
refs/heads/branch-1.2 512144ed2 -> 8953da28c
HBASE-14946 Don't allow multi's to over run the max result size.
Summary:
* Add VersionInfoUtil to determine if a client has a specified version or better
* Add an exception type to say that the response should be chunked
* Add on client knowledge of retry exceptions
* Add on metrics for how often this happens
Test Plan: Added a unit test
Differential Revision: https://reviews.facebook.net/D51771
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8953da28
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8953da28
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8953da28
Branch: refs/heads/branch-1.2
Commit: 8953da28cb3ddc22a56661b35657aaa68f445a7a
Parents: 512144e
Author: Elliott Clark <ec...@apache.org>
Authored: Mon Dec 7 18:33:35 2015 -0800
Committer: Elliott Clark <ec...@apache.org>
Committed: Thu Dec 10 18:32:06 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/MultiActionResultTooLarge.java | 31 +++++
.../hadoop/hbase/RetryImmediatelyException.java | 27 ++++
.../hadoop/hbase/client/AsyncProcess.java | 89 +++++++++----
.../hadoop/hbase/client/ConnectionManager.java | 4 +-
.../org/apache/hadoop/hbase/client/Result.java | 3 +
.../hbase/ipc/MetricsHBaseServerSource.java | 8 +-
.../hbase/ipc/MetricsHBaseServerSourceImpl.java | 9 ++
.../hadoop/hbase/client/VersionInfoUtil.java | 63 ++++++++++
.../hadoop/hbase/ipc/MetricsHBaseServer.java | 3 +
.../apache/hadoop/hbase/ipc/RpcCallContext.java | 23 +++-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 28 ++++-
.../master/procedure/ProcedurePrepareLatch.java | 23 +---
.../hbase/regionserver/RSRpcServices.java | 124 +++++++++++++------
.../hbase/client/TestMultiRespectsLimits.java | 102 +++++++++++++++
14 files changed, 449 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
new file mode 100644
index 0000000..d06eea1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Exception thrown when the result needs to be chunked on the server side.
+ * It signals that retries should happen right away and not count against the number of
+ * retries because some of the multi was a success.
+ */
+public class MultiActionResultTooLarge extends RetryImmediatelyException {
+
+ public MultiActionResultTooLarge(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
new file mode 100644
index 0000000..1b39904
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+public class RetryImmediatelyException extends IOException {
+ public RetryImmediatelyException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 3d55efc..0d093b1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -126,19 +127,36 @@ class AsyncProcess {
public void waitUntilDone() throws InterruptedIOException;
}
- /** Return value from a submit that didn't contain any requests. */
+ /**
+ * Return value from a submit that didn't contain any requests.
+ */
private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
- public final Object[] result = new Object[0];
+
+ final Object[] result = new Object[0];
+
@Override
- public boolean hasError() { return false; }
+ public boolean hasError() {
+ return false;
+ }
+
@Override
- public RetriesExhaustedWithDetailsException getErrors() { return null; }
+ public RetriesExhaustedWithDetailsException getErrors() {
+ return null;
+ }
+
@Override
- public List<? extends Row> getFailedOperations() { return null; }
+ public List<? extends Row> getFailedOperations() {
+ return null;
+ }
+
@Override
- public Object[] getResults() { return result; }
+ public Object[] getResults() {
+ return result;
+ }
+
@Override
- public void waitUntilDone() throws InterruptedIOException {}
+ public void waitUntilDone() throws InterruptedIOException {
+ }
};
/** Sync point for calls to multiple replicas for the same user request (Get).
@@ -306,8 +324,12 @@ class AsyncProcess {
* RuntimeException
*/
private ExecutorService getPool(ExecutorService pool) {
- if (pool != null) return pool;
- if (this.pool != null) return this.pool;
+ if (pool != null) {
+ return pool;
+ }
+ if (this.pool != null) {
+ return this.pool;
+ }
throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
}
@@ -365,7 +387,9 @@ class AsyncProcess {
Row r = it.next();
HRegionLocation loc;
try {
- if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
+ if (r == null) {
+ throw new IllegalArgumentException("#" + id + ", row cannot be null");
+ }
// Make sure we get 0-s replica.
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
@@ -728,10 +752,10 @@ class AsyncProcess {
// Normal case: we received an answer from the server, and it's not an exception.
receiveMultiAction(multiAction, server, res, numAttempt);
} catch (Throwable t) {
- // Something really bad happened. We are on the send thread that will now die.
- LOG.error("Internal AsyncProcess #" + id + " error for "
- + tableName + " processing for " + server, t);
- throw new RuntimeException(t);
+ // Something really bad happened. We are on the send thread that will now die.
+ LOG.error("Internal AsyncProcess #" + id + " error for "
+ + tableName + " processing for " + server, t);
+ throw new RuntimeException(t);
} finally {
decTaskCounters(multiAction.getRegions(), server);
if (callsInProgress != null && callable != null) {
@@ -750,19 +774,25 @@ class AsyncProcess {
private final TableName tableName;
private final AtomicLong actionsInProgress = new AtomicLong(-1);
- /** The lock controls access to results. It is only held when populating results where
+ /**
+ * The lock controls access to results. It is only held when populating results where
* there might be several callers (eventual consistency gets). For other requests,
- * there's one unique call going on per result index. */
+ * there's one unique call going on per result index.
+ */
private final Object replicaResultLock = new Object();
- /** Result array. Null if results are not needed. Otherwise, each index corresponds to
+ /**
+ * Result array. Null if results are not needed. Otherwise, each index corresponds to
* the action index in initial actions submitted. For most request types, has null-s for
* requests that are not done, and result/exception for those that are done.
* For eventual-consistency gets, initially the same applies; at some point, replica calls
* might be started, and ReplicaResultState is put at the corresponding indices. The
* returning calls check the type to detect when this is the case. After all calls are done,
- * ReplicaResultState-s are replaced with results for the user. */
+ * ReplicaResultState-s are replaced with results for the user.
+ */
private final Object[] results;
- /** Indices of replica gets in results. If null, all or no actions are replica-gets. */
+ /**
+ * Indices of replica gets in results. If null, all or no actions are replica-gets.
+ */
private final int[] replicaGetIndices;
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
@@ -777,7 +807,9 @@ class AsyncProcess {
this.actionsInProgress.set(actions.size());
if (results != null) {
assert needResults;
- if (results.length != actions.size()) throw new AssertionError("results.length");
+ if (results.length != actions.size()) {
+ throw new AssertionError("results.length");
+ }
this.results = results;
for (int i = 0; i != this.results.length; ++i) {
results[i] = null;
@@ -1177,9 +1209,13 @@ class AsyncProcess {
// We have two contradicting needs here:
// 1) We want to get the new location after having slept, as it may change.
// 2) We want to take into account the location when calculating the sleep time.
+ // 3) If all this is just because the response needed to be chunked try again FAST.
// It should be possible to have some heuristics to take the right decision. Short term,
// we go for one.
- long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
+ boolean retryImmediately = throwable instanceof RetryImmediatelyException;
+ int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
+ long backOffTime = retryImmediately ? 0 :
+ errorsByServer.calculateBackoffTime(oldServer, pause);
if (numAttempt > startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on
@@ -1188,14 +1224,16 @@ class AsyncProcess {
}
try {
- Thread.sleep(backOffTime);
+ if (backOffTime > 0) {
+ Thread.sleep(backOffTime);
+ }
} catch (InterruptedException e) {
LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
Thread.currentThread().interrupt();
return;
}
- groupAndSendMultiAction(toReplay, numAttempt + 1);
+ groupAndSendMultiAction(toReplay, nextAttemptNumber);
}
private void logNoResubmit(ServerName oldServer, int numAttempt,
@@ -1255,6 +1293,7 @@ class AsyncProcess {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = sentAction.getAction();
+ throwable = ConnectionManager.findException(result);
// Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) {
regionFailureRegistered = true;
@@ -1404,7 +1443,9 @@ class AsyncProcess {
// will either see state with callCount 0 after locking it; or will not see state at all
// we will replace it with the result.
synchronized (state) {
- if (state.callCount == 0) return; // someone already set the result
+ if (state.callCount == 0) {
+ return; // someone already set the result
+ }
state.callCount = 0;
}
synchronized (replicaResultLock) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index b68aa8b..8d24874 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
@@ -2701,7 +2702,8 @@ class ConnectionManager {
Throwable cur = (Throwable) exception;
while (cur != null) {
if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
- || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException) {
+ || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
+ || cur instanceof RetryImmediatelyException) {
return cur;
}
if (cur instanceof RemoteException) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index d295953..e764c4e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -913,6 +913,9 @@ public class Result implements CellScannable, CellScanner {
*/
public static long getTotalSizeOfCells(Result result) {
long size = 0;
+ if (result.isEmpty()) {
+ return size;
+ }
for (Cell c : result.rawCells()) {
size += CellUtil.estimatedHeapSizeOf(c);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 5cf71f3..061a672 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -74,6 +74,9 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
+ String EXCEPTIONS_MULTI_TOO_LARGE_NAME = "exceptions.multiResponseTooLarge";
+ String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was too large and the " +
+ "rest of the requests will have to be retried.";
void authorizationSuccess();
@@ -96,6 +99,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void notServingRegionException();
void unknownScannerException();
void tooBusyException();
+ void multiActionTooLargeException();
void sentBytes(long count);
@@ -110,4 +114,6 @@ public interface MetricsHBaseServerSource extends BaseSource {
void processedCall(int processingTime);
void queuedAndProcessedCall(int totalTime);
- }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 8984394..487f9f5 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
implements MetricsHBaseServerSource {
+
private final MetricsHBaseServerWrapper wrapper;
private final MutableCounterLong authorizationSuccesses;
private final MutableCounterLong authorizationFailures;
@@ -47,6 +48,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableCounterLong exceptionsSanity;
private final MutableCounterLong exceptionsNSRE;
private final MutableCounterLong exceptionsMoved;
+ private final MutableCounterLong exceptionsMultiTooLarge;
private MutableHistogram queueCallTime;
@@ -81,6 +83,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_MOVED_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsNSRE = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_NSRE_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+ this.exceptionsMultiTooLarge = this.getMetricsRegistry()
+ .newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
@@ -160,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
}
@Override
+ public void multiActionTooLargeException() {
+ exceptionsMultiTooLarge.incr();
+ }
+
+ @Override
public void authenticationSuccess() {
authenticationSuccesses.incr();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
new file mode 100644
index 0000000..c405518
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+
+
+/**
+ * Class to help with parsing the version info.
+ */
+@InterfaceAudience.Private
+public final class VersionInfoUtil {
+
+ private VersionInfoUtil() {
+ /* UTIL CLASS ONLY */
+ }
+
+ public static boolean currentClientHasMinimumVersion(int major, int minor) {
+ RpcCallContext call = RpcServer.getCurrentCall();
+ HBaseProtos.VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null;
+ return hasMinimumVersion(versionInfo, major, minor);
+ }
+
+ public static boolean hasMinimumVersion(HBaseProtos.VersionInfo versionInfo,
+ int major,
+ int minor) {
+ if (versionInfo != null) {
+ try {
+ String[] components = versionInfo.getVersion().split("\\.");
+
+ int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0;
+ if (clientMajor != major) {
+ return clientMajor > major;
+ }
+
+ int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0;
+ return clientMinor >= minor;
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index d276503..05bebb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.ipc;
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.UnknownScannerException;
@@ -105,6 +106,8 @@ public class MetricsHBaseServer {
source.notServingRegionException();
} else if (throwable instanceof FailedSanityCheckException) {
source.failedSanityException();
+ } else if (throwable instanceof MultiActionResultTooLarge) {
+ source.multiActionTooLargeException();
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index 976b508..3e38dbf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.hbase.ipc;
import java.net.InetAddress;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.security.User;
-
+@InterfaceAudience.Private
public interface RpcCallContext extends Delayable {
/**
* Check if the caller who made this IPC call has disconnected.
@@ -40,7 +41,7 @@ public interface RpcCallContext extends Delayable {
* support cellblocks while fielding requests from clients that do not.
* @return True if the client supports cellblocks, else return all content in pb
*/
- boolean isClientCellBlockSupport();
+ boolean isClientCellBlockSupported();
/**
* Returns the user credentials associated with the current RPC request or
@@ -63,4 +64,22 @@ public interface RpcCallContext extends Delayable {
* @return the client version info, or null if the information is not present
*/
VersionInfo getClientVersionInfo();
+
+ boolean isRetryImmediatelySupported();
+
+ /**
+ * The size of response cells that have been accumulated so far.
+ * This along with the corresponding increment call is used to ensure that multi's or
+ * scans dont get too excessively large
+ */
+ long getResponseCellSize();
+
+ /**
+ * Add on the given amount to the retained cell size.
+ *
+ * This is not thread safe and not synchronized at all. If this is used by more than one thread
+ * then everything will break. Since this is called for every row synchronization would be too
+ * onerous.
+ */
+ void incrementResponseCellSize(long cellSize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index cd38bd7..06d8ca9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
@@ -316,6 +317,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private User user;
private InetAddress remoteAddress;
+ private long responseCellSize = 0;
+ private boolean retryImmediatelySupported;
+
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
@@ -335,6 +339,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.tinfo = tinfo;
this.user = connection.user;
this.remoteAddress = remoteAddress;
+ this.retryImmediatelySupported = connection.retryImmediatelySupported;
}
/**
@@ -511,7 +516,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
@Override
- public boolean isClientCellBlockSupport() {
+ public boolean isClientCellBlockSupported() {
return this.connection != null && this.connection.codec != null;
}
@@ -528,6 +533,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return this.size;
}
+ public long getResponseCellSize() {
+ return responseCellSize;
+ }
+
+ public void incrementResponseCellSize(long cellSize) {
+ responseCellSize += cellSize;
+ }
+
/**
* If we have a response, and delay is not set, then respond
* immediately. Otherwise, do not respond to client. This is
@@ -563,6 +576,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public VersionInfo getClientVersionInfo() {
return connection.getVersionInfo();
}
+
+
+ @Override
+ public boolean isRetryImmediatelySupported() {
+ return retryImmediatelySupported;
+ }
}
/** Listens on the socket. Creates jobs for the handler threads*/
@@ -1248,6 +1267,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// was authentication allowed with a fallback to simple auth
private boolean authenticatedWithFallback;
+ private boolean retryImmediatelySupported = false;
+
public UserGroupInformation attemptingUser = null; // user name before auth
protected User user = null;
protected UserGroupInformation ugi = null;
@@ -1704,6 +1725,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
if (connectionHeader.hasVersionInfo()) {
+ // see if this connection will support RetryImmediatelyException
+ retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
+
AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with version info: "
+ TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
@@ -1711,6 +1735,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
+ " with unknown version info");
}
+
+
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
index 052386a..b13e44d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
@@ -24,10 +24,8 @@ import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
/**
* Latch used by the Master to have the prepare() sync behaviour for old
@@ -44,24 +42,7 @@ public abstract class ProcedurePrepareLatch {
}
public static boolean hasProcedureSupport() {
- return currentClientHasMinimumVersion(1, 1);
- }
-
- private static boolean currentClientHasMinimumVersion(int major, int minor) {
- RpcCallContext call = RpcServer.getCurrentCall();
- VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null;
- if (versionInfo != null) {
- String[] components = versionInfo.getVersion().split("\\.");
-
- int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0;
- if (clientMajor != major) {
- return clientMajor > major;
- }
-
- int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0;
- return clientMinor >= minor;
- }
- return false;
+ return VersionInfoUtil.currentClientHasMinimumVersion(1, 1);
}
protected abstract void countDown(final Procedure proc);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 00d20aa..f8f2268 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -367,7 +368,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private boolean isClientCellBlockSupport() {
RpcCallContext context = RpcServer.getCurrentCall();
- return context != null && context.isClientCellBlockSupport();
+ return context != null && context.isClientCellBlockSupported();
}
private void addResult(final MutateResponse.Builder builder,
@@ -426,13 +427,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
- case PUT:
- rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
- break;
- case DELETE:
- rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
- break;
- default:
+ case PUT:
+ rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+ break;
+ case DELETE:
+ rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+ break;
+ default:
throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
@@ -469,14 +470,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rm = new RowMutations(action.getMutation().getRow().toByteArray());
}
switch (type) {
- case PUT:
- rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
- break;
- case DELETE:
- rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
- break;
- default:
- throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+ case PUT:
+ rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+ break;
+ case DELETE:
+ rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+ break;
+ default:
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
}
}
return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
@@ -577,10 +578,43 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// ResultOrException instance that matches each Put or Delete is then added down in the
// doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched
List<ClientProtos.Action> mutations = null;
- for (ClientProtos.Action action: actions.getActionList()) {
+ long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+ RpcCallContext context = RpcServer.getCurrentCall();
+ IOException sizeIOE = null;
+ for (ClientProtos.Action action : actions.getActionList()) {
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
try {
Result r = null;
+
+ if (context != null
+ && context.isRetryImmediatelySupported()
+ && context.getResponseCellSize() > maxQuotaResultSize) {
+
+ // We're storing the exception since the exception and reason string won't
+ // change after the response size limit is reached.
+ if (sizeIOE == null ) {
+ // We don't need the stack un-winding do don't throw the exception.
+ // Throwing will kill the JVM's JIT.
+ //
+ // Instead just create the exception and then store it.
+ sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: "
+ + context.getResponseCellSize());
+
+ // Only report the exception once since there's only one request that
+ // caused the exception. Otherwise this number will dominate the exceptions count.
+ rpcServer.getMetrics().exception(sizeIOE);
+ }
+
+ // Now that there's an exception is know to be created
+ // use it for the response.
+ //
+ // This will create a copy in the builder.
+ resultOrExceptionBuilder = ResultOrException.newBuilder().
+ setException(ResponseConverter.buildException(sizeIOE));
+ resultOrExceptionBuilder.setIndex(action.getIndex());
+ builder.addResultOrException(resultOrExceptionBuilder.build());
+ continue;
+ }
if (action.hasGet()) {
Get get = ProtobufUtil.toGet(action.getGet());
r = region.get(get);
@@ -633,11 +667,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (isClientCellBlockSupport()) {
pbResult = ProtobufUtil.toResultNoData(r);
// Hard to guess the size here. Just make a rough guess.
- if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
+ if (cellsToReturn == null) {
+ cellsToReturn = new ArrayList<CellScannable>();
+ }
cellsToReturn.add(r);
} else {
pbResult = ProtobufUtil.toResult(r);
}
+ if (context != null) {
+ context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
+ }
resultOrExceptionBuilder =
ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
}
@@ -719,8 +758,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
case SUCCESS:
builder.addResultOrException(getResultOrException(
- ClientProtos.Result.getDefaultInstance(), index,
- ((HRegion)region).getRegionStats()));
+ ClientProtos.Result.getDefaultInstance(), index,
+ ((HRegion) region).getRegionStats()));
break;
}
}
@@ -869,13 +908,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
try {
rpcServer = new RpcServer(rs, name, getServices(),
- bindAddress, // use final bindAddress for this server.
- rs.conf,
- rpcSchedulerFactory.create(rs.conf, this, rs));
- } catch(BindException be) {
+ bindAddress, // use final bindAddress for this server.
+ rs.conf,
+ rpcSchedulerFactory.create(rs.conf, this, rs));
+ } catch (BindException be) {
String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
- HConstants.REGIONSERVER_PORT;
- throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
+ HConstants.REGIONSERVER_PORT;
+ throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
"' configuration property.", be.getCause() != null ? be.getCause() : be);
}
@@ -2004,7 +2043,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
- if (controller != null) controller.setCellScanner(null);
+ if (controller != null) {
+ controller.setCellScanner(null);
+ }
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
@@ -2070,7 +2111,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
}
- if (processed != null) responseBuilder.setProcessed(processed);
+ if (processed != null) {
+ responseBuilder.setProcessed(processed);
+ }
return responseBuilder.build();
}
@@ -2087,10 +2130,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
- CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+ CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
OperationQuota quota = null;
// Clear scanner so we are not holding on to reference across call.
- if (controller != null) controller.setCellScanner(null);
+ if (controller != null) {
+ controller.setCellScanner(null);
+ }
try {
checkOpen();
requestCount.increment();
@@ -2243,6 +2288,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean moreResults = true;
boolean closeScanner = false;
boolean isSmallScan = false;
+ RpcCallContext context = RpcServer.getCurrentCall();
ScanResponse.Builder builder = ScanResponse.newBuilder();
if (request.hasCloseScanner()) {
closeScanner = request.getCloseScanner();
@@ -2323,8 +2369,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// where processing of request takes > lease expiration time.
lease = regionServer.leases.removeLease(scannerName);
List<Result> results = new ArrayList<Result>();
- long totalCellSize = 0;
- long currentScanResultSize = 0;
boolean done = false;
// Call coprocessor. Get region info from scanner.
@@ -2334,8 +2378,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!results.isEmpty()) {
for (Result r : results) {
for (Cell cell : r.rawCells()) {
- totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
- currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
+ if (context != null) {
+ context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
+ }
}
}
}
@@ -2368,7 +2413,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// If the coprocessor host is adding to the result list, we cannot guarantee the
// correct ordering of partial results and so we prevent partial results from being
// formed.
- boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
+ boolean serverGuaranteesOrderOfPartials = results.isEmpty();
boolean allowPartialResults =
clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
boolean moreRows = false;
@@ -2435,7 +2480,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!values.isEmpty()) {
for (Cell cell : values) {
- totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
+ if (context != null) {
+ context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
+ }
}
final boolean partial = scannerContext.partialResultFormed();
results.add(Result.create(values, null, stale, partial));
@@ -2490,9 +2537,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
region.updateReadRequestsCount(i);
- region.getMetrics().updateScanNext(totalCellSize);
+ long responseCellSize = context != null ? context.getResponseCellSize() : 0;
+ region.getMetrics().updateScanNext(responseCellSize);
if (regionServer.metricsRegionServer != null) {
- regionServer.metricsRegionServer.updateScannerNext(totalCellSize);
+ regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
}
} finally {
region.closeRegionOperation();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
new file mode 100644
index 0000000..47dd7be
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.assertEquals;
+
+/**
+ * This test sets the multi size WAAAAAY low and then checks to make sure that gets will still make
+ * progress.
+ */
+@Category({MediumTests.class, ClientTests.class})
+public class TestMultiRespectsLimits {
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final MetricsAssertHelper METRICS_ASSERT =
+ CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+ private final static byte[] FAMILY = Bytes.toBytes("D");
+ public static final int MAX_SIZE = 500;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setLong(
+ HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
+ MAX_SIZE);
+
+ // Only start on regionserver so that all regions are on the same server.
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMultiLimits() throws Exception {
+ final TableName name = TableName.valueOf("testMultiLimits");
+ Table t = TEST_UTIL.createTable(name, FAMILY);
+ TEST_UTIL.loadTable(t, FAMILY, false);
+
+ // Split the table to make sure that the chunking happens accross regions.
+ try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
+ admin.split(name);
+ TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return admin.getTableRegions(name).size() > 1;
+ }
+ });
+ }
+ List<Get> gets = new ArrayList<>(MAX_SIZE);
+
+ for (int i = 0; i < MAX_SIZE; i++) {
+ gets.add(new Get(HBaseTestingUtility.ROWS[i]));
+ }
+ Result[] results = t.get(gets);
+ assertEquals(MAX_SIZE, results.length);
+ RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
+ BaseSource s = rpcServer.getMetrics().getMetricsSource();
+
+ // Cells from TEST_UTIL.loadTable have a length of 27.
+ // Multiplying by less than that gives an easy lower bound on size.
+ // However in reality each kv is being reported as much higher than that.
+ METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s);
+ METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
+ (MAX_SIZE * 25) / MAX_SIZE, s);
+ }
+}