You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2015/12/11 03:34:37 UTC

hbase git commit: HBASE-14946 Don't allow multi's to over run the max result size.

Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 512144ed2 -> 8953da28c


HBASE-14946 Don't allow multi's to over run the max result size.

Summary:
* Add VersionInfoUtil to determine if a client has a specified version or better
* Add an exception type to say that the response should be chunked
* Add on client knowledge of retry exceptions
* Add on metrics for how often this happens

Test Plan: Added a unit test

Differential Revision: https://reviews.facebook.net/D51771


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8953da28
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8953da28
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8953da28

Branch: refs/heads/branch-1.2
Commit: 8953da28cb3ddc22a56661b35657aaa68f445a7a
Parents: 512144e
Author: Elliott Clark <ec...@apache.org>
Authored: Mon Dec 7 18:33:35 2015 -0800
Committer: Elliott Clark <ec...@apache.org>
Committed: Thu Dec 10 18:32:06 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/MultiActionResultTooLarge.java |  31 +++++
 .../hadoop/hbase/RetryImmediatelyException.java |  27 ++++
 .../hadoop/hbase/client/AsyncProcess.java       |  89 +++++++++----
 .../hadoop/hbase/client/ConnectionManager.java  |   4 +-
 .../org/apache/hadoop/hbase/client/Result.java  |   3 +
 .../hbase/ipc/MetricsHBaseServerSource.java     |   8 +-
 .../hbase/ipc/MetricsHBaseServerSourceImpl.java |   9 ++
 .../hadoop/hbase/client/VersionInfoUtil.java    |  63 ++++++++++
 .../hadoop/hbase/ipc/MetricsHBaseServer.java    |   3 +
 .../apache/hadoop/hbase/ipc/RpcCallContext.java |  23 +++-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |  28 ++++-
 .../master/procedure/ProcedurePrepareLatch.java |  23 +---
 .../hbase/regionserver/RSRpcServices.java       | 124 +++++++++++++------
 .../hbase/client/TestMultiRespectsLimits.java   | 102 +++++++++++++++
 14 files changed, 449 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
new file mode 100644
index 0000000..d06eea1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MultiActionResultTooLarge.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Exception thrown when the result needs to be chunked on the server side.
+ * It signals that retries should happen right away and not count against the number of
+ * retries because some of the multi was a success.
+ */
+public class MultiActionResultTooLarge extends RetryImmediatelyException {
+
+  public MultiActionResultTooLarge(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
new file mode 100644
index 0000000..1b39904
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RetryImmediatelyException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+public class RetryImmediatelyException extends IOException {
+  public RetryImmediatelyException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 3d55efc..0d093b1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -126,19 +127,36 @@ class AsyncProcess {
     public void waitUntilDone() throws InterruptedIOException;
   }
 
-  /** Return value from a submit that didn't contain any requests. */
+  /**
+   * Return value from a submit that didn't contain any requests.
+   */
   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
-    public final Object[] result = new Object[0];
+
+    final Object[] result = new Object[0];
+
     @Override
-    public boolean hasError() { return false; }
+    public boolean hasError() {
+      return false;
+    }
+
     @Override
-    public RetriesExhaustedWithDetailsException getErrors() { return null; }
+    public RetriesExhaustedWithDetailsException getErrors() {
+      return null;
+    }
+
     @Override
-    public List<? extends Row> getFailedOperations() { return null; }
+    public List<? extends Row> getFailedOperations() {
+      return null;
+    }
+
     @Override
-    public Object[] getResults() { return result; }
+    public Object[] getResults() {
+      return result;
+    }
+
     @Override
-    public void waitUntilDone() throws InterruptedIOException {}
+    public void waitUntilDone() throws InterruptedIOException {
+    }
   };
 
   /** Sync point for calls to multiple replicas for the same user request (Get).
@@ -306,8 +324,12 @@ class AsyncProcess {
    *         RuntimeException
    */
   private ExecutorService getPool(ExecutorService pool) {
-    if (pool != null) return pool;
-    if (this.pool != null) return this.pool;
+    if (pool != null) {
+      return pool;
+    }
+    if (this.pool != null) {
+      return this.pool;
+    }
     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
   }
 
@@ -365,7 +387,9 @@ class AsyncProcess {
         Row r = it.next();
         HRegionLocation loc;
         try {
-          if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
+          if (r == null) {
+            throw new IllegalArgumentException("#" + id + ", row cannot be null");
+          }
           // Make sure we get 0-s replica.
           RegionLocations locs = connection.locateRegion(
               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
@@ -728,10 +752,10 @@ class AsyncProcess {
           // Normal case: we received an answer from the server, and it's not an exception.
           receiveMultiAction(multiAction, server, res, numAttempt);
         } catch (Throwable t) {
-              // Something really bad happened. We are on the send thread that will now die.
-              LOG.error("Internal AsyncProcess #" + id + " error for "
-                  + tableName + " processing for " + server, t);
-              throw new RuntimeException(t);
+          // Something really bad happened. We are on the send thread that will now die.
+          LOG.error("Internal AsyncProcess #" + id + " error for "
+              + tableName + " processing for " + server, t);
+          throw new RuntimeException(t);
         } finally {
           decTaskCounters(multiAction.getRegions(), server);
           if (callsInProgress != null && callable != null) {
@@ -750,19 +774,25 @@ class AsyncProcess {
 
     private final TableName tableName;
     private final AtomicLong actionsInProgress = new AtomicLong(-1);
-    /** The lock controls access to results. It is only held when populating results where
+    /**
+     * The lock controls access to results. It is only held when populating results where
      * there might be several callers (eventual consistency gets). For other requests,
-     * there's one unique call going on per result index. */
+     * there's one unique call going on per result index.
+     */
     private final Object replicaResultLock = new Object();
-    /** Result array.  Null if results are not needed. Otherwise, each index corresponds to
+    /**
+     * Result array.  Null if results are not needed. Otherwise, each index corresponds to
      * the action index in initial actions submitted. For most request types, has null-s for
      * requests that are not done, and result/exception for those that are done.
      * For eventual-consistency gets, initially the same applies; at some point, replica calls
      * might be started, and ReplicaResultState is put at the corresponding indices. The
      * returning calls check the type to detect when this is the case. After all calls are done,
-     * ReplicaResultState-s are replaced with results for the user. */
+     * ReplicaResultState-s are replaced with results for the user.
+     */
     private final Object[] results;
-    /** Indices of replica gets in results. If null, all or no actions are replica-gets. */
+    /**
+     * Indices of replica gets in results. If null, all or no actions are replica-gets.
+     */
     private final int[] replicaGetIndices;
     private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
@@ -777,7 +807,9 @@ class AsyncProcess {
       this.actionsInProgress.set(actions.size());
       if (results != null) {
         assert needResults;
-        if (results.length != actions.size()) throw new AssertionError("results.length");
+        if (results.length != actions.size()) {
+          throw new AssertionError("results.length");
+        }
         this.results = results;
         for (int i = 0; i != this.results.length; ++i) {
           results[i] = null;
@@ -1177,9 +1209,13 @@ class AsyncProcess {
       // We have two contradicting needs here:
       //  1) We want to get the new location after having slept, as it may change.
       //  2) We want to take into account the location when calculating the sleep time.
+      //  3) If all this is just because the response needed to be chunked try again FAST.
       // It should be possible to have some heuristics to take the right decision. Short term,
       //  we go for one.
-      long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
+      boolean retryImmediately = throwable instanceof RetryImmediatelyException;
+      int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
+      long backOffTime = retryImmediately ? 0 :
+          errorsByServer.calculateBackoffTime(oldServer, pause);
       if (numAttempt > startLogErrorsCnt) {
         // We use this value to have some logs when we have multiple failures, but not too many
         //  logs, as errors are to be expected when a region moves, splits and so on
@@ -1188,14 +1224,16 @@ class AsyncProcess {
       }
 
       try {
-        Thread.sleep(backOffTime);
+        if (backOffTime > 0) {
+          Thread.sleep(backOffTime);
+        }
       } catch (InterruptedException e) {
         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
         Thread.currentThread().interrupt();
         return;
       }
 
-      groupAndSendMultiAction(toReplay, numAttempt + 1);
+      groupAndSendMultiAction(toReplay, nextAttemptNumber);
     }
 
     private void logNoResubmit(ServerName oldServer, int numAttempt,
@@ -1255,6 +1293,7 @@ class AsyncProcess {
           // Failure: retry if it's make sense else update the errors lists
           if (result == null || result instanceof Throwable) {
             Row row = sentAction.getAction();
+            throwable = ConnectionManager.findException(result);
             // Register corresponding failures once per server/once per region.
             if (!regionFailureRegistered) {
               regionFailureRegistered = true;
@@ -1404,7 +1443,9 @@ class AsyncProcess {
       // will either see state with callCount 0 after locking it; or will not see state at all
       // we will replace it with the result.
       synchronized (state) {
-        if (state.callCount == 0) return; // someone already set the result
+        if (state.callCount == 0) {
+          return; // someone already set the result
+        }
         state.callCount = 0;
       }
       synchronized (replicaResultLock) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index b68aa8b..8d24874 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.RetryImmediatelyException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
@@ -2701,7 +2702,8 @@ class ConnectionManager {
     Throwable cur = (Throwable) exception;
     while (cur != null) {
       if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
-          || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException) {
+          || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException
+          || cur instanceof RetryImmediatelyException) {
         return cur;
       }
       if (cur instanceof RemoteException) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index d295953..e764c4e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -913,6 +913,9 @@ public class Result implements CellScannable, CellScanner {
    */
   public static long getTotalSizeOfCells(Result result) {
     long size = 0;
+    if (result.isEmpty()) {
+      return size;
+    }
     for (Cell c : result.rawCells()) {
       size += CellUtil.estimatedHeapSizeOf(c);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 5cf71f3..061a672 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -74,6 +74,9 @@ public interface MetricsHBaseServerSource extends BaseSource {
   String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
   String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
   String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
+  String EXCEPTIONS_MULTI_TOO_LARGE_NAME = "exceptions.multiResponseTooLarge";
+  String EXCEPTIONS_MULTI_TOO_LARGE_DESC = "A response to a multi request was too large and the " +
+      "rest of the requests will have to be retried.";
 
   void authorizationSuccess();
 
@@ -96,6 +99,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
   void notServingRegionException();
   void unknownScannerException();
   void tooBusyException();
+  void multiActionTooLargeException();
 
   void sentBytes(long count);
 
@@ -110,4 +114,6 @@ public interface MetricsHBaseServerSource extends BaseSource {
   void processedCall(int processingTime);
 
   void queuedAndProcessedCall(int totalTime);
-  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 8984394..487f9f5 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
 public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
     implements MetricsHBaseServerSource {
 
+
   private final MetricsHBaseServerWrapper wrapper;
   private final MutableCounterLong authorizationSuccesses;
   private final MutableCounterLong authorizationFailures;
@@ -47,6 +48,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
   private final MutableCounterLong exceptionsSanity;
   private final MutableCounterLong exceptionsNSRE;
   private final MutableCounterLong exceptionsMoved;
+  private final MutableCounterLong exceptionsMultiTooLarge;
 
 
   private MutableHistogram queueCallTime;
@@ -81,6 +83,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
         .newCounter(EXCEPTIONS_MOVED_NAME, EXCEPTIONS_TYPE_DESC, 0L);
     this.exceptionsNSRE = this.getMetricsRegistry()
         .newCounter(EXCEPTIONS_NSRE_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+    this.exceptionsMultiTooLarge = this.getMetricsRegistry()
+        .newCounter(EXCEPTIONS_MULTI_TOO_LARGE_NAME, EXCEPTIONS_MULTI_TOO_LARGE_DESC, 0L);
 
     this.authenticationSuccesses = this.getMetricsRegistry().newCounter(
         AUTHENTICATION_SUCCESSES_NAME, AUTHENTICATION_SUCCESSES_DESC, 0L);
@@ -160,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
   }
 
   @Override
+  public void multiActionTooLargeException() {
+    exceptionsMultiTooLarge.incr();
+  }
+
+  @Override
   public void authenticationSuccess() {
     authenticationSuccesses.incr();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
new file mode 100644
index 0000000..c405518
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+
+
+/**
+ * Class to help with parsing the version info.
+ */
+@InterfaceAudience.Private
+public final class VersionInfoUtil {
+
+  private VersionInfoUtil() {
+    /* UTIL CLASS ONLY */
+  }
+
+  public static boolean currentClientHasMinimumVersion(int major, int minor) {
+    RpcCallContext call = RpcServer.getCurrentCall();
+    HBaseProtos.VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null;
+    return hasMinimumVersion(versionInfo, major, minor);
+  }
+
+  public static boolean hasMinimumVersion(HBaseProtos.VersionInfo versionInfo,
+                                          int major,
+                                          int minor) {
+    if (versionInfo != null) {
+      try {
+        String[] components = versionInfo.getVersion().split("\\.");
+
+        int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0;
+        if (clientMajor != major) {
+          return clientMajor > major;
+        }
+
+        int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0;
+        return clientMinor >= minor;
+      } catch (NumberFormatException e) {
+        return false;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index d276503..05bebb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.UnknownScannerException;
@@ -105,6 +106,8 @@ public class MetricsHBaseServer {
         source.notServingRegionException();
       } else if (throwable instanceof FailedSanityCheckException) {
         source.failedSanityException();
+      } else if (throwable instanceof MultiActionResultTooLarge) {
+        source.multiActionTooLargeException();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index 976b508..3e38dbf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.net.InetAddress;
 
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.security.User;
 
-
+@InterfaceAudience.Private
 public interface RpcCallContext extends Delayable {
   /**
    * Check if the caller who made this IPC call has disconnected.
@@ -40,7 +41,7 @@ public interface RpcCallContext extends Delayable {
    * support cellblocks while fielding requests from clients that do not.
    * @return True if the client supports cellblocks, else return all content in pb
    */
-  boolean isClientCellBlockSupport();
+  boolean isClientCellBlockSupported();
 
   /**
    * Returns the user credentials associated with the current RPC request or
@@ -63,4 +64,22 @@ public interface RpcCallContext extends Delayable {
    * @return the client version info, or null if the information is not present
    */
   VersionInfo getClientVersionInfo();
+
+  boolean isRetryImmediatelySupported();
+
+  /**
+   * The size of response cells that have been accumulated so far.
+   * This along with the corresponding increment call is used to ensure that multi's or
+   * scans dont get too excessively large
+   */
+  long getResponseCellSize();
+
+  /**
+   * Add on the given amount to the retained cell size.
+   *
+   * This is not thread safe and not synchronized at all. If this is used by more than one thread
+   * then everything will break. Since this is called for every row synchronization would be too
+   * onerous.
+   */
+  void incrementResponseCellSize(long cellSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index cd38bd7..06d8ca9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
 import org.apache.hadoop.hbase.client.Operation;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
@@ -316,6 +317,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     private User user;
     private InetAddress remoteAddress;
 
+    private long responseCellSize = 0;
+    private boolean retryImmediatelySupported;
+
     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
          Message param, CellScanner cellScanner, Connection connection, Responder responder,
          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
@@ -335,6 +339,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       this.tinfo = tinfo;
       this.user = connection.user;
       this.remoteAddress = remoteAddress;
+      this.retryImmediatelySupported = connection.retryImmediatelySupported;
     }
 
     /**
@@ -511,7 +516,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     }
 
     @Override
-    public boolean isClientCellBlockSupport() {
+    public boolean isClientCellBlockSupported() {
       return this.connection != null && this.connection.codec != null;
     }
 
@@ -528,6 +533,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return this.size;
     }
 
+    public long getResponseCellSize() {
+      return responseCellSize;
+    }
+
+    public void incrementResponseCellSize(long cellSize) {
+      responseCellSize += cellSize;
+    }
+
     /**
      * If we have a response, and delay is not set, then respond
      * immediately.  Otherwise, do not respond to client.  This is
@@ -563,6 +576,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     public VersionInfo getClientVersionInfo() {
       return connection.getVersionInfo();
     }
+
+
+    @Override
+    public boolean isRetryImmediatelySupported() {
+      return retryImmediatelySupported;
+    }
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/
@@ -1248,6 +1267,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     // was authentication allowed with a fallback to simple auth
     private boolean authenticatedWithFallback;
 
+    private boolean retryImmediatelySupported = false;
+
     public UserGroupInformation attemptingUser = null; // user name before auth
     protected User user = null;
     protected UserGroupInformation ugi = null;
@@ -1704,6 +1725,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
       }
       if (connectionHeader.hasVersionInfo()) {
+        // see if this connection will support RetryImmediatelyException
+        retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
+
         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
             + " with version info: "
             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
@@ -1711,6 +1735,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
             + " with unknown version info");
       }
+
+
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
index 052386a..b13e44d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
@@ -24,10 +24,8 @@ import java.util.concurrent.CountDownLatch;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
 
 /**
  * Latch used by the Master to have the prepare() sync behaviour for old
@@ -44,24 +42,7 @@ public abstract class ProcedurePrepareLatch {
   }
 
   public static boolean hasProcedureSupport() {
-    return currentClientHasMinimumVersion(1, 1);
-  }
-
-  private static boolean currentClientHasMinimumVersion(int major, int minor) {
-    RpcCallContext call = RpcServer.getCurrentCall();
-    VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null;
-    if (versionInfo != null) {
-      String[] components = versionInfo.getVersion().split("\\.");
-
-      int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0;
-      if (clientMajor != major) {
-        return clientMajor > major;
-      }
-
-      int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0;
-      return clientMinor >= minor;
-    }
-    return false;
+    return VersionInfoUtil.currentClientHasMinimumVersion(1, 1);
   }
 
   protected abstract void countDown(final Procedure proc);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 00d20aa..f8f2268 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MultiActionResultTooLarge;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -367,7 +368,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private boolean isClientCellBlockSupport() {
     RpcCallContext context = RpcServer.getCurrentCall();
-    return context != null && context.isClientCellBlockSupport();
+    return context != null && context.isClientCellBlockSupported();
   }
 
   private void addResult(final MutateResponse.Builder builder,
@@ -426,13 +427,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         rm = new RowMutations(action.getMutation().getRow().toByteArray());
       }
       switch (type) {
-      case PUT:
-        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
-        break;
-      case DELETE:
-        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
-        break;
-      default:
+        case PUT:
+          rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+          break;
+        case DELETE:
+          rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+          break;
+        default:
           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
       }
     }
@@ -469,14 +470,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         rm = new RowMutations(action.getMutation().getRow().toByteArray());
       }
       switch (type) {
-      case PUT:
-        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
-        break;
-      case DELETE:
-        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
-        break;
-      default:
-        throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+        case PUT:
+          rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
+          break;
+        case DELETE:
+          rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
+          break;
+        default:
+          throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
       }
     }
     return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
@@ -577,10 +578,43 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     // ResultOrException instance that matches each Put or Delete is then added down in the
     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
     List<ClientProtos.Action> mutations = null;
-    for (ClientProtos.Action action: actions.getActionList()) {
+    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+    RpcCallContext context = RpcServer.getCurrentCall();
+    IOException sizeIOE = null;
+    for (ClientProtos.Action action : actions.getActionList()) {
       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
       try {
         Result r = null;
+
+        if (context != null
+            && context.isRetryImmediatelySupported()
+            && context.getResponseCellSize() > maxQuotaResultSize) {
+
+          // We're storing the exception since the exception and reason string won't
+          // change after the response size limit is reached.
+          if (sizeIOE == null ) {
+            // We don't need the stack un-winding do don't throw the exception.
+            // Throwing will kill the JVM's JIT.
+            //
+            // Instead just create the exception and then store it.
+            sizeIOE = new MultiActionResultTooLarge("Max response size exceeded: "
+                    + context.getResponseCellSize());
+
+            // Only report the exception once since there's only one request that
+            // caused the exception. Otherwise this number will dominate the exceptions count.
+            rpcServer.getMetrics().exception(sizeIOE);
+          }
+
+          // Now that there's an exception is know to be created
+          // use it for the response.
+          //
+          // This will create a copy in the builder.
+          resultOrExceptionBuilder = ResultOrException.newBuilder().
+              setException(ResponseConverter.buildException(sizeIOE));
+          resultOrExceptionBuilder.setIndex(action.getIndex());
+          builder.addResultOrException(resultOrExceptionBuilder.build());
+          continue;
+        }
         if (action.hasGet()) {
           Get get = ProtobufUtil.toGet(action.getGet());
           r = region.get(get);
@@ -633,11 +667,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (isClientCellBlockSupport()) {
             pbResult = ProtobufUtil.toResultNoData(r);
             //  Hard to guess the size here.  Just make a rough guess.
-            if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
+            if (cellsToReturn == null) {
+              cellsToReturn = new ArrayList<CellScannable>();
+            }
             cellsToReturn.add(r);
           } else {
             pbResult = ProtobufUtil.toResult(r);
           }
+          if (context != null) {
+            context.incrementResponseCellSize(Result.getTotalSizeOfCells(r));
+          }
           resultOrExceptionBuilder =
             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
         }
@@ -719,8 +758,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
           case SUCCESS:
             builder.addResultOrException(getResultOrException(
-              ClientProtos.Result.getDefaultInstance(), index,
-                ((HRegion)region).getRegionStats()));
+                ClientProtos.Result.getDefaultInstance(), index,
+                ((HRegion) region).getRegionStats()));
             break;
         }
       }
@@ -869,13 +908,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
     try {
       rpcServer = new RpcServer(rs, name, getServices(),
-        bindAddress, // use final bindAddress for this server.
-        rs.conf,
-        rpcSchedulerFactory.create(rs.conf, this, rs));
-    } catch(BindException be) {
+          bindAddress, // use final bindAddress for this server.
+          rs.conf,
+          rpcSchedulerFactory.create(rs.conf, this, rs));
+    } catch (BindException be) {
       String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
-        HConstants.REGIONSERVER_PORT;
-        throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
+          HConstants.REGIONSERVER_PORT;
+      throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
           "' configuration property.", be.getCause() != null ? be.getCause() : be);
     }
 
@@ -2004,7 +2043,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
-    if (controller != null) controller.setCellScanner(null);
+    if (controller != null) {
+      controller.setCellScanner(null);
+    }
 
     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
 
@@ -2070,7 +2111,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
     }
-    if (processed != null) responseBuilder.setProcessed(processed);
+    if (processed != null) {
+      responseBuilder.setProcessed(processed);
+    }
     return responseBuilder.build();
   }
 
@@ -2087,10 +2130,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
-    CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+    CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
     OperationQuota quota = null;
     // Clear scanner so we are not holding on to reference across call.
-    if (controller != null) controller.setCellScanner(null);
+    if (controller != null) {
+      controller.setCellScanner(null);
+    }
     try {
       checkOpen();
       requestCount.increment();
@@ -2243,6 +2288,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       boolean moreResults = true;
       boolean closeScanner = false;
       boolean isSmallScan = false;
+      RpcCallContext context = RpcServer.getCurrentCall();
       ScanResponse.Builder builder = ScanResponse.newBuilder();
       if (request.hasCloseScanner()) {
         closeScanner = request.getCloseScanner();
@@ -2323,8 +2369,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           // where processing of request takes > lease expiration time.
           lease = regionServer.leases.removeLease(scannerName);
           List<Result> results = new ArrayList<Result>();
-          long totalCellSize = 0;
-          long currentScanResultSize = 0;
 
           boolean done = false;
           // Call coprocessor. Get region info from scanner.
@@ -2334,8 +2378,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             if (!results.isEmpty()) {
               for (Result r : results) {
                 for (Cell cell : r.rawCells()) {
-                  totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
-                  currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
+                  if (context != null) {
+                    context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
+                  }
                 }
               }
             }
@@ -2368,7 +2413,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                 // If the coprocessor host is adding to the result list, we cannot guarantee the
                 // correct ordering of partial results and so we prevent partial results from being
                 // formed.
-                boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
+                boolean serverGuaranteesOrderOfPartials = results.isEmpty();
                 boolean allowPartialResults =
                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
                 boolean moreRows = false;
@@ -2435,7 +2480,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
                   if (!values.isEmpty()) {
                     for (Cell cell : values) {
-                      totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
+                      if (context != null) {
+                        context.incrementResponseCellSize(CellUtil.estimatedSerializedSizeOf(cell));
+                      }
                     }
                     final boolean partial = scannerContext.partialResultFormed();
                     results.add(Result.create(values, null, stale, partial));
@@ -2490,9 +2537,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                 }
               }
               region.updateReadRequestsCount(i);
-              region.getMetrics().updateScanNext(totalCellSize);
+              long responseCellSize = context != null ? context.getResponseCellSize() : 0;
+              region.getMetrics().updateScanNext(responseCellSize);
               if (regionServer.metricsRegionServer != null) {
-                regionServer.metricsRegionServer.updateScannerNext(totalCellSize);
+                regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
               }
             } finally {
               region.closeRegionOperation();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8953da28/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
new file mode 100644
index 0000000..47dd7be
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.TestCase.assertEquals;
+
+/**
+ * This test sets the multi size WAAAAAY low and then checks to make sure that gets will still make
+ * progress.
+ */
+@Category({MediumTests.class, ClientTests.class})
+public class TestMultiRespectsLimits {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final MetricsAssertHelper METRICS_ASSERT =
+      CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+  private final static byte[] FAMILY = Bytes.toBytes("D");
+  public static final int MAX_SIZE = 500;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setLong(
+        HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
+        MAX_SIZE);
+
+    // Only start on regionserver so that all regions are on the same server.
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMultiLimits() throws Exception {
+    final TableName name = TableName.valueOf("testMultiLimits");
+    Table t = TEST_UTIL.createTable(name, FAMILY);
+    TEST_UTIL.loadTable(t, FAMILY, false);
+
+    // Split the table to make sure that the chunking happens accross regions.
+    try (final Admin admin = TEST_UTIL.getHBaseAdmin()) {
+      admin.split(name);
+      TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return admin.getTableRegions(name).size() > 1;
+        }
+      });
+    }
+    List<Get> gets = new ArrayList<>(MAX_SIZE);
+
+    for (int i = 0; i < MAX_SIZE; i++) {
+      gets.add(new Get(HBaseTestingUtility.ROWS[i]));
+    }
+    Result[] results = t.get(gets);
+    assertEquals(MAX_SIZE, results.length);
+    RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
+    BaseSource s = rpcServer.getMetrics().getMetricsSource();
+
+    // Cells from TEST_UTIL.loadTable have a length of 27.
+    // Multiplying by less than that gives an easy lower bound on size.
+    // However in reality each kv is being reported as much higher than that.
+    METRICS_ASSERT.assertCounterGt("exceptions", (MAX_SIZE * 25) / MAX_SIZE, s);
+    METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
+        (MAX_SIZE * 25) / MAX_SIZE, s);
+  }
+}