You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2018/02/07 00:46:51 UTC

hbase git commit: HBASE-19900 Region-level exception destroy the result of batch

Repository: hbase
Updated Branches:
  refs/heads/master a5b86dd77 -> d8b999e69


HBASE-19900 Region-level exception destroy the result of batch


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8b999e6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8b999e6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8b999e6

Branch: refs/heads/master
Commit: d8b999e6950bc01e8aab8ecce437e710e4a98e15
Parents: a5b86dd
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Tue Feb 6 05:33:37 2018 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Wed Feb 7 08:41:56 2018 +0800

----------------------------------------------------------------------
 .../client/AsyncBatchRpcRetryingCaller.java     |  29 +--
 .../hbase/client/AsyncRequestFutureImpl.java    | 243 +++++++++---------
 .../TestAsyncProcessWithRegionException.java    | 252 +++++++++++++++++++
 .../client/TestMalformedCellFromClient.java     | 173 +++++++++++++
 4 files changed, 558 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 62ee0ab..51b89a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -251,8 +251,8 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   @SuppressWarnings("unchecked")
   private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
-      RegionResult regionResult, List<Action> failedActions) {
-    Object result = regionResult.result.get(action.getOriginalIndex());
+      RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
+    Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
     if (result == null) {
       LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
           + Bytes.toStringBinary(action.getAction().getRow()) + "' of "
@@ -279,27 +279,28 @@ class AsyncBatchRpcRetryingCaller<T> {
     List<Action> failedActions = new ArrayList<>();
     actionsByRegion.forEach((rn, regionReq) -> {
       RegionResult regionResult = resp.getResults().get(rn);
+      Throwable regionException = resp.getException(rn);
       if (regionResult != null) {
         regionReq.actions.forEach(
-          action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions));
+          action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
+            regionException));
       } else {
-        Throwable t = resp.getException(rn);
         Throwable error;
-        if (t == null) {
+        if (regionException == null) {
           LOG.error(
             "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
           error = new RuntimeException("Invalid response");
         } else {
-          error = translateException(t);
-          logException(tries, () -> Stream.of(regionReq), error, serverName);
-          conn.getLocator().updateCachedLocation(regionReq.loc, error);
-          if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
-            failAll(regionReq.actions.stream(), tries, error, serverName);
-            return;
-          }
-          addError(regionReq.actions, error, serverName);
-          failedActions.addAll(regionReq.actions);
+          error = translateException(regionException);
+        }
+        logException(tries, () -> Stream.of(regionReq), error, serverName);
+        conn.getLocator().updateCachedLocation(regionReq.loc, error);
+        if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+          failAll(regionReq.actions.stream(), tries, error, serverName);
+          return;
         }
+        addError(regionReq.actions, error, serverName);
+        failedActions.addAll(regionReq.actions);
       }
     });
     if (!failedActions.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 3ab94c5..ace74f9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -19,8 +19,6 @@
 
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -36,28 +34,29 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.RetryImmediatelyException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.trace.TraceUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.htrace.core.Tracer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 
 /**
  * The context, and return value, for a single submit/submitAll call.
@@ -152,7 +151,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
         if (results[index] != null) return;
         // We set the number of calls here. After that any path must call setResult/setError.
         // True even for replicas that are not found - if we refuse to send we MUST set error.
-        results[index] = new ReplicaResultState(locs.length);
+        updateResult(index, new ReplicaResultState(locs.length));
       }
       for (int i = 1; i < locs.length; ++i) {
         Action replicaAction = new Action(action, i);
@@ -234,7 +233,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
         } else {
           if (results != null) {
             SingleResponse singleResponse = (SingleResponse) res;
-            results[0] = singleResponse.getEntry();
+            updateResult(0, singleResponse.getEntry());
           }
           decActionCounter(1);
         }
@@ -706,27 +705,17 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
     Retry canRetry = errorsByServer.canTryMore(numAttempt)
         ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
 
-    if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) {
-      // tableName is null when we made a cross-table RPC call.
-      asyncProcess.connection.clearCaches(server);
-    }
-    int failed = 0, stopped = 0;
+    cleanServerCache(server, t);
+    int failed = 0;
+    int stopped = 0;
     List<Action> toReplay = new ArrayList<>();
     for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
       byte[] regionName = e.getKey();
-      byte[] row = e.getValue().iterator().next().getAction().getRow();
+      byte[] row = e.getValue().get(0).getAction().getRow();
       // Do not use the exception for updating cache because it might be coming from
       // any of the regions in the MultiAction.
-      try {
-        if (tableName != null) {
-          asyncProcess.connection.updateCachedLocations(tableName, regionName, row,
-              ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
-        }
-      } catch (Throwable ex) {
-        // That should never happen, but if it did, we want to make sure
-        // we still process errors
-        LOG.error("Couldn't update cached region locations: " + ex);
-      }
+      updateCachedLocations(server, regionName, row,
+        ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
       for (Action action : e.getValue()) {
         Retry retry = manageError(
             action.getOriginalIndex(), action.getAction(), canRetry, t, server);
@@ -819,6 +808,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
                                   ServerName server, MultiResponse responses, int numAttempt) {
     assert responses != null;
 
+    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
+    updateStats(server, results);
+
     // Success or partial success
     // Analyze detailed results. We can still have individual failures to be redo.
     // two specific throwables are managed:
@@ -826,126 +818,111 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
     //  - RegionMovedException: we update the cache with the new region location
 
     List<Action> toReplay = new ArrayList<>();
-    Throwable throwable = null;
+    Throwable lastException = null;
     int failureCount = 0;
-    boolean canRetry = true;
-
-    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
-    updateStats(server, results);
-
-    int failed = 0, stopped = 0;
+    int failed = 0;
+    int stopped = 0;
+    Retry retry = null;
     // Go by original action.
     for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
       byte[] regionName = regionEntry.getKey();
-      Map<Integer, Object> regionResults = results.get(regionName) == null
-          ?  null : results.get(regionName).result;
-      if (regionResults == null) {
-        if (!responses.getExceptions().containsKey(regionName)) {
-          LOG.error("Server sent us neither results nor exceptions for "
-              + Bytes.toStringBinary(regionName));
-          responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
-        }
-        continue;
-      }
+
+      Throwable regionException = responses.getExceptions().get(regionName);
+      cleanServerCache(server, regionException);
+
+      Map<Integer, Object> regionResults =
+        results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
       boolean regionFailureRegistered = false;
       for (Action sentAction : regionEntry.getValue()) {
         Object result = regionResults.get(sentAction.getOriginalIndex());
+        if (result == null) {
+          if (regionException == null) {
+            LOG.error("Server sent us neither results nor exceptions for "
+              + Bytes.toStringBinary(regionName)
+              + ", numAttempt:" + numAttempt);
+            regionException = new RuntimeException("Invalid response");
+          }
+          // If the row operation encounters the region-lever error, the exception of action may be
+          // null.
+          result = regionException;
+        }
         // Failure: retry if it's make sense else update the errors lists
-        if (result == null || result instanceof Throwable) {
+        if (result instanceof Throwable) {
+          Throwable actionException = (Throwable) result;
           Row row = sentAction.getAction();
-          throwable = ClientExceptionsUtil.findException(result);
+          lastException = regionException != null ? regionException
+            : ClientExceptionsUtil.findException(actionException);
           // Register corresponding failures once per server/once per region.
           if (!regionFailureRegistered) {
             regionFailureRegistered = true;
-            try {
-              asyncProcess.connection.updateCachedLocations(
-                  tableName, regionName, row.getRow(), result, server);
-            } catch (Throwable ex) {
-              // That should never happen, but if it did, we want to make sure
-              // we still process errors
-              LOG.error("Couldn't update cached region locations: " + ex);
-            }
+            updateCachedLocations(server, regionName, row.getRow(), actionException);
           }
-          if (failureCount == 0) {
+          if (retry == null) {
             errorsByServer.reportServerError(server);
             // We determine canRetry only once for all calls, after reporting server failure.
-            canRetry = errorsByServer.canTryMore(numAttempt);
+            retry = errorsByServer.canTryMore(numAttempt) ?
+              Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
           }
           ++failureCount;
-          Retry retry = manageError(sentAction.getOriginalIndex(), row,
-              canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
-          if (retry == Retry.YES) {
-            toReplay.add(sentAction);
-          } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
-            ++stopped;
-          } else {
-            ++failed;
+          switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException,
+            server)) {
+            case YES:
+              toReplay.add(sentAction);
+              break;
+            case NO_OTHER_SUCCEEDED:
+              ++stopped;
+              break;
+            default:
+              ++failed;
+              break;
           }
         } else {
-          if (callback != null) {
-            try {
-              //noinspection unchecked
-              // TODO: would callback expect a replica region name if it gets one?
-              this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
-            } catch (Throwable t) {
-              LOG.error("User callback threw an exception for "
-                  + Bytes.toStringBinary(regionName) + ", ignoring", t);
-            }
-          }
+          invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
           setResult(sentAction, result);
         }
       }
     }
+    if (toReplay.isEmpty()) {
+      logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
+    } else {
+      resubmit(server, toReplay, numAttempt, failureCount, lastException);
+    }
+  }
 
-    // The failures global to a region. We will use for multiAction we sent previously to find the
-    //   actions to replay.
-    for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
-      throwable = throwableEntry.getValue();
-      byte[] region = throwableEntry.getKey();
-      List<Action> actions = multiAction.actions.get(region);
-      if (actions == null || actions.isEmpty()) {
-        throw new IllegalStateException("Wrong response for the region: " +
-            HRegionInfo.encodeRegionName(region));
-      }
-
-      if (failureCount == 0) {
-        errorsByServer.reportServerError(server);
-        canRetry = errorsByServer.canTryMore(numAttempt);
-      }
-      if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
-        // For multi-actions, we don't have a table name, but we want to make sure to clear the
-        // cache in case there were location-related exceptions. We don't to clear the cache
-        // for every possible exception that comes through, however.
-        asyncProcess.connection.clearCaches(server);
-      } else {
-        try {
-          asyncProcess.connection.updateCachedLocations(
-              tableName, region, actions.get(0).getAction().getRow(), throwable, server);
-        } catch (Throwable ex) {
-          // That should never happen, but if it did, we want to make sure
-          // we still process errors
-          LOG.error("Couldn't update cached region locations: " + ex);
-        }
-      }
-      failureCount += actions.size();
+  private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
+    Throwable rowException) {
+    if (tableName == null) {
+      return;
+    }
+    try {
+      asyncProcess.connection
+        .updateCachedLocations(tableName, regionName, row, rowException, server);
+    } catch (Throwable ex) {
+      // That should never happen, but if it did, we want to make sure
+      // we still process errors
+      LOG.error("Couldn't update cached region locations: " + ex);
+    }
+  }
 
-      for (Action action : actions) {
-        Row row = action.getAction();
-        Retry retry = manageError(action.getOriginalIndex(), row,
-            canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
-        if (retry == Retry.YES) {
-          toReplay.add(action);
-        } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
-          ++stopped;
-        } else {
-          ++failed;
-        }
+  private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
+    if (callback != null) {
+      try {
+        //noinspection unchecked
+        // TODO: would callback expect a replica region name if it gets one?
+        this.callback.update(regionName, row, result);
+      } catch (Throwable t) {
+        LOG.error("User callback threw an exception for "
+          + Bytes.toStringBinary(regionName) + ", ignoring", t);
       }
     }
-    if (toReplay.isEmpty()) {
-      logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
-    } else {
-      resubmit(server, toReplay, numAttempt, failureCount, throwable);
+  }
+
+  private void cleanServerCache(ServerName server, Throwable regionException) {
+    if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) {
+      // For multi-actions, we don't have a table name, but we want to make sure to clear the
+      // cache in case there were location-related exceptions. We don't to clear the cache
+      // for every possible exception that comes through, however.
+      asyncProcess.connection.clearCaches(server);
     }
   }
 
@@ -1041,7 +1018,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
       if (results[index] != state) {
         throw new AssertionError("We set the callCount but someone else replaced the result");
       }
-      results[index] = result;
+      updateResult(index, result);
     }
 
     decActionCounter(index);
@@ -1099,7 +1076,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
         if (results[index] != state) {
           throw new AssertionError("We set the callCount but someone else replaced the result");
         }
-        results[index] = throwable;
+        updateResult(index, throwable);
       }
       decActionCounter(index);
     }
@@ -1130,7 +1107,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
       if (isFromReplica) {
         throw new AssertionError("Unexpected stale result for " + row);
       }
-      results[index] = result;
+      updateResult(index, result);
     } else {
       synchronized (replicaResultLock) {
         resObj = results[index];
@@ -1138,7 +1115,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
           if (isFromReplica) {
             throw new AssertionError("Unexpected stale result for " + row);
           }
-          results[index] = result;
+          updateResult(index, result);
         }
       }
     }
@@ -1276,4 +1253,20 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
     return new MultiServerCallable(asyncProcess.connection, tableName, server,
         multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
   }
+
+  private void updateResult(int index, Object result) {
+    Object current = results[index];
+    if (current != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The result is assigned repeatedly! current:" + current
+          + ", new:" + result);
+      }
+    }
+    results[index] = result;
+  }
+
+  @VisibleForTesting
+  long getNumberOfActionsInProgress() {
+    return actionsInProgress.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
new file mode 100644
index 0000000..c46385e
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * The purpose of this test is to make sure the region exception won't corrupt the results
+ * of batch. The prescription is shown below.
+ * 1) honor the action result rather than region exception. If the action have both of true result
+ * and region exception, the action is fine as the exception is caused by other actions
+ * which are in the same region.
+ * 2) honor the action exception rather than region exception. If the action have both of action
+ * exception and region exception, we deal with the action exception only. If we also
+ * handle the region exception for the same action, it will introduce the negative count of
+ * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
+ *
+ * This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+).
+ * It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is
+ * only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue.
+ * We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths.
+ */
+@Category({ ClientTests.class, SmallTests.class })
+public class TestAsyncProcessWithRegionException {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
+
+  private static final Result EMPTY_RESULT = Result.create(null, true);
+  private static final IOException IOE = new IOException("YOU CAN'T PASS");
+  private static final Configuration CONF = new Configuration();
+  private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
+  private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
+  private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
+  private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
+    Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
+  private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
+  private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
+  private static final RegionInfo REGION_INFO =
+    RegionInfoBuilder.newBuilder(DUMMY_TABLE)
+      .setStartKey(HConstants.EMPTY_START_ROW)
+      .setEndKey(HConstants.EMPTY_END_ROW)
+      .setSplit(false)
+      .setRegionId(1)
+      .build();
+
+  private static final HRegionLocation REGION_LOCATION =
+    new HRegionLocation(REGION_INFO, SERVER_NAME);
+
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    // disable the retry
+    CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+  }
+
+  @Test
+  public void testSuccessivePut() throws Exception {
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
+
+    List<Put> puts = new ArrayList<>(1);
+    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+    final int expectedSize = puts.size();
+    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
+    arf.waitUntilDone();
+    Object[] result = arf.getResults();
+    assertEquals(expectedSize, result.length);
+    for (Object r : result) {
+      assertEquals(Result.class, r.getClass());
+    }
+    assertTrue(puts.isEmpty());
+    assertActionsInProgress(arf);
+  }
+
+  @Test
+  public void testFailedPut() throws Exception {
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
+
+    List<Put> puts = new ArrayList<>(2);
+    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+    // this put should fail
+    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+    final int expectedSize = puts.size();
+
+    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
+    arf.waitUntilDone();
+    // There is a failed puts
+    assertError(arf, 1);
+    Object[] result = arf.getResults();
+    assertEquals(expectedSize, result.length);
+    assertEquals(Result.class, result[0].getClass());
+    assertTrue(result[1] instanceof IOException);
+    assertTrue(puts.isEmpty());
+    assertActionsInProgress(arf);
+  }
+
+  @Test
+  public void testFailedPutWithoutActionException() throws Exception {
+    MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
+
+    List<Put> puts = new ArrayList<>(3);
+    puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+    // this put should fail
+    puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+    // this put should fail, and it won't have action exception
+    puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
+    final int expectedSize = puts.size();
+
+    AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
+    arf.waitUntilDone();
+    // There are two failed puts
+    assertError(arf, 2);
+    Object[] result = arf.getResults();
+    assertEquals(expectedSize, result.length);
+    assertEquals(Result.class, result[0].getClass());
+    assertTrue(result[1] instanceof IOException);
+    assertTrue(result[2] instanceof IOException);
+    assertTrue(puts.isEmpty());
+    assertActionsInProgress(arf);
+  }
+
+  private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
+    assertTrue(arf.hasError());
+    RetriesExhaustedWithDetailsException e = arf.getErrors();
+    List<Throwable> errors = e.getCauses();
+    assertEquals(expectedCountOfFailure, errors.size());
+    for (Throwable t : errors) {
+      assertTrue(t instanceof IOException);
+    }
+  }
+
+  private static void assertActionsInProgress(AsyncRequestFuture arf) {
+    if (arf instanceof AsyncRequestFutureImpl) {
+      assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
+    }
+  }
+
+  private static ClusterConnection createHConnection() throws IOException {
+    ClusterConnection hc = Mockito.mock(ClusterConnection.class);
+    NonceGenerator ng = Mockito.mock(NonceGenerator.class);
+    Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
+    Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
+    Mockito.when(hc.getConfiguration()).thenReturn(CONF);
+    Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
+    setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
+    setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
+    Mockito
+      .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
+      .thenReturn(Collections.singletonList(REGION_LOCATION));
+    return hc;
+  }
+
+  private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
+    throws IOException {
+    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
+      Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
+    Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
+      Mockito.anyBoolean())).thenReturn(result);
+  }
+
+  private static class MyAsyncProcess extends AsyncProcess {
+    private final ExecutorService service = Executors.newFixedThreadPool(5);
+
+    MyAsyncProcess(ClusterConnection hc, Configuration conf) {
+      super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+    }
+
+    public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
+      throws InterruptedIOException {
+      return submit(AsyncProcessTask.newBuilder()
+        .setPool(service)
+        .setTableName(tableName)
+        .setRowAccess(rows)
+        .setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
+        .setNeedResults(true)
+        .setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
+        .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
+        .build());
+    }
+
+    @Override
+    protected RpcRetryingCaller<AbstractResponse> createCaller(
+      CancellableRegionServerCallable callable, int rpcTimeout) {
+      MultiServerCallable callable1 = (MultiServerCallable) callable;
+      MultiResponse mr = new MultiResponse();
+      callable1.getMulti().actions.forEach((regionName, actions) -> {
+        actions.forEach(action -> {
+          if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
+            mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
+          } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
+            mr.add(regionName, action.getOriginalIndex(), IOE);
+          }
+        });
+      });
+      mr.addException(REGION_INFO.getRegionName(), IOE);
+      return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
+        @Override
+        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
+          int callTimeout) {
+          try {
+            // sleep one second in order for threadpool to start another thread instead of reusing
+            // existing one.
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            // pass
+          }
+          return mr;
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
new file mode 100644
index 0000000..e44a2e9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * The purpose of this test is to make sure the region exception won't corrupt the results
+ * of batch. The prescription is shown below.
+ * 1) honor the action result rather than region exception. If the action have both of true result
+ * and region exception, the action is fine as the exception is caused by other actions
+ * which are in the same region.
+ * 2) honor the action exception rather than region exception. If the action have both of action
+ * exception and region exception, we deal with the action exception only. If we also
+ * handle the region exception for the same action, it will introduce the negative count of
+ * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
+ *
+ * The no-cluster test is in TestAsyncProcessWithRegionException.
+ */
+@Category({ MediumTests.class, ClientTests.class })
+public class TestMalformedCellFromClient {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static final int CELL_SIZE = 100;
+  private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // disable the retry
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @Before
+  public void before() throws Exception {
+    TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+      .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+      .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build();
+    TEST_UTIL.getConnection().getAdmin().createTable(desc);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
+      TEST_UTIL.deleteTable(htd.getTableName());
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * The purpose of this ut is to check the consistency between the exception and results.
+   * If the RetriesExhaustedWithDetailsException contains the whole batch,
+   * each result should be of IOE. Otherwise, the row operation which is not in the exception
+   * should have a true result.
+   */
+  @Test
+  public void testRegionException() throws InterruptedException, IOException {
+    List<Row> batches = new ArrayList<>();
+    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
+    // the rm is used to prompt the region exception.
+    // see RSRpcServices#multi
+    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
+    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+    batches.add(rm);
+    Object[] results = new Object[batches.size()];
+
+    try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+      Throwable exceptionByCaught = null;
+      try {
+        table.batch(batches, results);
+        fail("Where is the exception? We put the malformed cells!!!");
+      } catch (RetriesExhaustedWithDetailsException e) {
+        for (Throwable throwable : e.getCauses()) {
+          assertNotNull(throwable);
+        }
+        assertEquals(1, e.getNumExceptions());
+        exceptionByCaught = e.getCause(0);
+      }
+      for (Object obj : results) {
+        assertNotNull(obj);
+      }
+      assertEquals(Result.class, results[0].getClass());
+      assertEquals(exceptionByCaught.getClass(), results[1].getClass());
+      Result result = table.get(new Get(Bytes.toBytes("good")));
+      assertEquals(1, result.size());
+      Cell cell = result.getColumnLatestCell(FAMILY, null);
+      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
+    }
+  }
+
+  /**
+   * The purpose of this ut is to check the consistency between the exception and results.
+   * If the RetriesExhaustedWithDetailsException contains the whole batch,
+   * each result should be of IOE. Otherwise, the row operation which is not in the exception
+   * should have a true result.
+   */
+  @Test
+  public void testRegionExceptionByAsync() throws Exception {
+    List<Row> batches = new ArrayList<>();
+    batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
+    // the rm is used to prompt the region exception.
+    // see RSRpcServices#multi
+    RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
+    rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+    batches.add(rm);
+    try (AsyncConnection asyncConnection = ConnectionFactory
+      .createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
+      AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME);
+      List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches);
+      assertEquals(2, results.size());
+      try {
+        results.get(1).get();
+        fail("Where is the exception? We put the malformed cells!!!");
+      } catch (ExecutionException e) {
+        // pass
+      }
+      Result result = table.get(new Get(Bytes.toBytes("good"))).get();
+      assertEquals(1, result.size());
+      Cell cell = result.getColumnLatestCell(FAMILY, null);
+      assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
+    }
+  }
+}