You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2018/02/07 00:46:51 UTC
hbase git commit: HBASE-19900 Region-level exception destroy the
result of batch
Repository: hbase
Updated Branches:
refs/heads/master a5b86dd77 -> d8b999e69
HBASE-19900 Region-level exception destroy the result of batch
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d8b999e6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d8b999e6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d8b999e6
Branch: refs/heads/master
Commit: d8b999e6950bc01e8aab8ecce437e710e4a98e15
Parents: a5b86dd
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Tue Feb 6 05:33:37 2018 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Wed Feb 7 08:41:56 2018 +0800
----------------------------------------------------------------------
.../client/AsyncBatchRpcRetryingCaller.java | 29 +--
.../hbase/client/AsyncRequestFutureImpl.java | 243 +++++++++---------
.../TestAsyncProcessWithRegionException.java | 252 +++++++++++++++++++
.../client/TestMalformedCellFromClient.java | 173 +++++++++++++
4 files changed, 558 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 62ee0ab..51b89a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -251,8 +251,8 @@ class AsyncBatchRpcRetryingCaller<T> {
@SuppressWarnings("unchecked")
private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
- RegionResult regionResult, List<Action> failedActions) {
- Object result = regionResult.result.get(action.getOriginalIndex());
+ RegionResult regionResult, List<Action> failedActions, Throwable regionException) {
+ Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
if (result == null) {
LOG.error("Server " + serverName + " sent us neither result nor exception for row '"
+ Bytes.toStringBinary(action.getAction().getRow()) + "' of "
@@ -279,27 +279,28 @@ class AsyncBatchRpcRetryingCaller<T> {
List<Action> failedActions = new ArrayList<>();
actionsByRegion.forEach((rn, regionReq) -> {
RegionResult regionResult = resp.getResults().get(rn);
+ Throwable regionException = resp.getException(rn);
if (regionResult != null) {
regionReq.actions.forEach(
- action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions));
+ action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,
+ regionException));
} else {
- Throwable t = resp.getException(rn);
Throwable error;
- if (t == null) {
+ if (regionException == null) {
LOG.error(
"Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
error = new RuntimeException("Invalid response");
} else {
- error = translateException(t);
- logException(tries, () -> Stream.of(regionReq), error, serverName);
- conn.getLocator().updateCachedLocation(regionReq.loc, error);
- if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
- failAll(regionReq.actions.stream(), tries, error, serverName);
- return;
- }
- addError(regionReq.actions, error, serverName);
- failedActions.addAll(regionReq.actions);
+ error = translateException(regionException);
+ }
+ logException(tries, () -> Stream.of(regionReq), error, serverName);
+ conn.getLocator().updateCachedLocation(regionReq.loc, error);
+ if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
+ failAll(regionReq.actions.stream(), tries, error, serverName);
+ return;
}
+ addError(regionReq.actions, error, serverName);
+ failedActions.addAll(regionReq.actions);
}
});
if (!failedActions.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 3ab94c5..ace74f9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hbase.client;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -36,28 +34,29 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.trace.TraceUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.core.Tracer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
* The context, and return value, for a single submit/submitAll call.
@@ -152,7 +151,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (results[index] != null) return;
// We set the number of calls here. After that any path must call setResult/setError.
// True even for replicas that are not found - if we refuse to send we MUST set error.
- results[index] = new ReplicaResultState(locs.length);
+ updateResult(index, new ReplicaResultState(locs.length));
}
for (int i = 1; i < locs.length; ++i) {
Action replicaAction = new Action(action, i);
@@ -234,7 +233,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
} else {
if (results != null) {
SingleResponse singleResponse = (SingleResponse) res;
- results[0] = singleResponse.getEntry();
+ updateResult(0, singleResponse.getEntry());
}
decActionCounter(1);
}
@@ -706,27 +705,17 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
Retry canRetry = errorsByServer.canTryMore(numAttempt)
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
- if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) {
- // tableName is null when we made a cross-table RPC call.
- asyncProcess.connection.clearCaches(server);
- }
- int failed = 0, stopped = 0;
+ cleanServerCache(server, t);
+ int failed = 0;
+ int stopped = 0;
List<Action> toReplay = new ArrayList<>();
for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
byte[] regionName = e.getKey();
- byte[] row = e.getValue().iterator().next().getAction().getRow();
+ byte[] row = e.getValue().get(0).getAction().getRow();
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
- try {
- if (tableName != null) {
- asyncProcess.connection.updateCachedLocations(tableName, regionName, row,
- ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
- }
- } catch (Throwable ex) {
- // That should never happen, but if it did, we want to make sure
- // we still process errors
- LOG.error("Couldn't update cached region locations: " + ex);
- }
+ updateCachedLocations(server, regionName, row,
+ ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
for (Action action : e.getValue()) {
Retry retry = manageError(
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
@@ -819,6 +808,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
ServerName server, MultiResponse responses, int numAttempt) {
assert responses != null;
+ Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
+ updateStats(server, results);
+
// Success or partial success
// Analyze detailed results. We can still have individual failures to be redo.
// two specific throwables are managed:
@@ -826,126 +818,111 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
// - RegionMovedException: we update the cache with the new region location
List<Action> toReplay = new ArrayList<>();
- Throwable throwable = null;
+ Throwable lastException = null;
int failureCount = 0;
- boolean canRetry = true;
-
- Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
- updateStats(server, results);
-
- int failed = 0, stopped = 0;
+ int failed = 0;
+ int stopped = 0;
+ Retry retry = null;
// Go by original action.
for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
byte[] regionName = regionEntry.getKey();
- Map<Integer, Object> regionResults = results.get(regionName) == null
- ? null : results.get(regionName).result;
- if (regionResults == null) {
- if (!responses.getExceptions().containsKey(regionName)) {
- LOG.error("Server sent us neither results nor exceptions for "
- + Bytes.toStringBinary(regionName));
- responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
- }
- continue;
- }
+
+ Throwable regionException = responses.getExceptions().get(regionName);
+ cleanServerCache(server, regionException);
+
+ Map<Integer, Object> regionResults =
+ results.containsKey(regionName) ? results.get(regionName).result : Collections.emptyMap();
boolean regionFailureRegistered = false;
for (Action sentAction : regionEntry.getValue()) {
Object result = regionResults.get(sentAction.getOriginalIndex());
+ if (result == null) {
+ if (regionException == null) {
+ LOG.error("Server sent us neither results nor exceptions for "
+ + Bytes.toStringBinary(regionName)
+ + ", numAttempt:" + numAttempt);
+ regionException = new RuntimeException("Invalid response");
+ }
+ // If the row operation encounters the region-lever error, the exception of action may be
+ // null.
+ result = regionException;
+ }
// Failure: retry if it's make sense else update the errors lists
- if (result == null || result instanceof Throwable) {
+ if (result instanceof Throwable) {
+ Throwable actionException = (Throwable) result;
Row row = sentAction.getAction();
- throwable = ClientExceptionsUtil.findException(result);
+ lastException = regionException != null ? regionException
+ : ClientExceptionsUtil.findException(actionException);
// Register corresponding failures once per server/once per region.
if (!regionFailureRegistered) {
regionFailureRegistered = true;
- try {
- asyncProcess.connection.updateCachedLocations(
- tableName, regionName, row.getRow(), result, server);
- } catch (Throwable ex) {
- // That should never happen, but if it did, we want to make sure
- // we still process errors
- LOG.error("Couldn't update cached region locations: " + ex);
- }
+ updateCachedLocations(server, regionName, row.getRow(), actionException);
}
- if (failureCount == 0) {
+ if (retry == null) {
errorsByServer.reportServerError(server);
// We determine canRetry only once for all calls, after reporting server failure.
- canRetry = errorsByServer.canTryMore(numAttempt);
+ retry = errorsByServer.canTryMore(numAttempt) ?
+ Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
}
++failureCount;
- Retry retry = manageError(sentAction.getOriginalIndex(), row,
- canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
- if (retry == Retry.YES) {
- toReplay.add(sentAction);
- } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
- ++stopped;
- } else {
- ++failed;
+ switch (manageError(sentAction.getOriginalIndex(), row, retry, actionException,
+ server)) {
+ case YES:
+ toReplay.add(sentAction);
+ break;
+ case NO_OTHER_SUCCEEDED:
+ ++stopped;
+ break;
+ default:
+ ++failed;
+ break;
}
} else {
- if (callback != null) {
- try {
- //noinspection unchecked
- // TODO: would callback expect a replica region name if it gets one?
- this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
- } catch (Throwable t) {
- LOG.error("User callback threw an exception for "
- + Bytes.toStringBinary(regionName) + ", ignoring", t);
- }
- }
+ invokeCallBack(regionName, sentAction.getAction().getRow(), (CResult) result);
setResult(sentAction, result);
}
}
}
+ if (toReplay.isEmpty()) {
+ logNoResubmit(server, numAttempt, failureCount, lastException, failed, stopped);
+ } else {
+ resubmit(server, toReplay, numAttempt, failureCount, lastException);
+ }
+ }
- // The failures global to a region. We will use for multiAction we sent previously to find the
- // actions to replay.
- for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
- throwable = throwableEntry.getValue();
- byte[] region = throwableEntry.getKey();
- List<Action> actions = multiAction.actions.get(region);
- if (actions == null || actions.isEmpty()) {
- throw new IllegalStateException("Wrong response for the region: " +
- HRegionInfo.encodeRegionName(region));
- }
-
- if (failureCount == 0) {
- errorsByServer.reportServerError(server);
- canRetry = errorsByServer.canTryMore(numAttempt);
- }
- if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
- // For multi-actions, we don't have a table name, but we want to make sure to clear the
- // cache in case there were location-related exceptions. We don't to clear the cache
- // for every possible exception that comes through, however.
- asyncProcess.connection.clearCaches(server);
- } else {
- try {
- asyncProcess.connection.updateCachedLocations(
- tableName, region, actions.get(0).getAction().getRow(), throwable, server);
- } catch (Throwable ex) {
- // That should never happen, but if it did, we want to make sure
- // we still process errors
- LOG.error("Couldn't update cached region locations: " + ex);
- }
- }
- failureCount += actions.size();
+ private void updateCachedLocations(ServerName server, byte[] regionName, byte[] row,
+ Throwable rowException) {
+ if (tableName == null) {
+ return;
+ }
+ try {
+ asyncProcess.connection
+ .updateCachedLocations(tableName, regionName, row, rowException, server);
+ } catch (Throwable ex) {
+ // That should never happen, but if it did, we want to make sure
+ // we still process errors
+ LOG.error("Couldn't update cached region locations: " + ex);
+ }
+ }
- for (Action action : actions) {
- Row row = action.getAction();
- Retry retry = manageError(action.getOriginalIndex(), row,
- canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
- if (retry == Retry.YES) {
- toReplay.add(action);
- } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
- ++stopped;
- } else {
- ++failed;
- }
+ private void invokeCallBack(byte[] regionName, byte[] row, CResult result) {
+ if (callback != null) {
+ try {
+ //noinspection unchecked
+ // TODO: would callback expect a replica region name if it gets one?
+ this.callback.update(regionName, row, result);
+ } catch (Throwable t) {
+ LOG.error("User callback threw an exception for "
+ + Bytes.toStringBinary(regionName) + ", ignoring", t);
}
}
- if (toReplay.isEmpty()) {
- logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
- } else {
- resubmit(server, toReplay, numAttempt, failureCount, throwable);
+ }
+
+ private void cleanServerCache(ServerName server, Throwable regionException) {
+ if (tableName == null && ClientExceptionsUtil.isMetaClearingException(regionException)) {
+ // For multi-actions, we don't have a table name, but we want to make sure to clear the
+ // cache in case there were location-related exceptions. We don't to clear the cache
+ // for every possible exception that comes through, however.
+ asyncProcess.connection.clearCaches(server);
}
}
@@ -1041,7 +1018,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (results[index] != state) {
throw new AssertionError("We set the callCount but someone else replaced the result");
}
- results[index] = result;
+ updateResult(index, result);
}
decActionCounter(index);
@@ -1099,7 +1076,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (results[index] != state) {
throw new AssertionError("We set the callCount but someone else replaced the result");
}
- results[index] = throwable;
+ updateResult(index, throwable);
}
decActionCounter(index);
}
@@ -1130,7 +1107,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (isFromReplica) {
throw new AssertionError("Unexpected stale result for " + row);
}
- results[index] = result;
+ updateResult(index, result);
} else {
synchronized (replicaResultLock) {
resObj = results[index];
@@ -1138,7 +1115,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (isFromReplica) {
throw new AssertionError("Unexpected stale result for " + row);
}
- results[index] = result;
+ updateResult(index, result);
}
}
}
@@ -1276,4 +1253,20 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
return new MultiServerCallable(asyncProcess.connection, tableName, server,
multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
}
+
+ private void updateResult(int index, Object result) {
+ Object current = results[index];
+ if (current != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The result is assigned repeatedly! current:" + current
+ + ", new:" + result);
+ }
+ }
+ results[index] = result;
+ }
+
+ @VisibleForTesting
+ long getNumberOfActionsInProgress() {
+ return actionsInProgress.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
new file mode 100644
index 0000000..c46385e
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcessWithRegionException.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * The purpose of this test is to make sure the region exception won't corrupt the results
+ * of batch. The prescription is shown below.
+ * 1) honor the action result rather than region exception. If the action have both of true result
+ * and region exception, the action is fine as the exception is caused by other actions
+ * which are in the same region.
+ * 2) honor the action exception rather than region exception. If the action have both of action
+ * exception and region exception, we deal with the action exception only. If we also
+ * handle the region exception for the same action, it will introduce the negative count of
+ * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
+ *
+ * This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+).
+ * It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is
+ * only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue.
+ * We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths.
+ */
+@Category({ ClientTests.class, SmallTests.class })
+public class TestAsyncProcessWithRegionException {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
+
+ private static final Result EMPTY_RESULT = Result.create(null, true);
+ private static final IOException IOE = new IOException("YOU CAN'T PASS");
+ private static final Configuration CONF = new Configuration();
+ private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
+ private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
+ private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
+ private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
+ Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
+ private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
+ private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
+ private static final RegionInfo REGION_INFO =
+ RegionInfoBuilder.newBuilder(DUMMY_TABLE)
+ .setStartKey(HConstants.EMPTY_START_ROW)
+ .setEndKey(HConstants.EMPTY_END_ROW)
+ .setSplit(false)
+ .setRegionId(1)
+ .build();
+
+ private static final HRegionLocation REGION_LOCATION =
+ new HRegionLocation(REGION_INFO, SERVER_NAME);
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ // disable the retry
+ CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+ }
+
+ @Test
+ public void testSuccessivePut() throws Exception {
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
+
+ List<Put> puts = new ArrayList<>(1);
+ puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+ final int expectedSize = puts.size();
+ AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
+ arf.waitUntilDone();
+ Object[] result = arf.getResults();
+ assertEquals(expectedSize, result.length);
+ for (Object r : result) {
+ assertEquals(Result.class, r.getClass());
+ }
+ assertTrue(puts.isEmpty());
+ assertActionsInProgress(arf);
+ }
+
+ @Test
+ public void testFailedPut() throws Exception {
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
+
+ List<Put> puts = new ArrayList<>(2);
+ puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+ // this put should fail
+ puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+ final int expectedSize = puts.size();
+
+ AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
+ arf.waitUntilDone();
+ // There is a failed puts
+ assertError(arf, 1);
+ Object[] result = arf.getResults();
+ assertEquals(expectedSize, result.length);
+ assertEquals(Result.class, result[0].getClass());
+ assertTrue(result[1] instanceof IOException);
+ assertTrue(puts.isEmpty());
+ assertActionsInProgress(arf);
+ }
+
+ @Test
+ public void testFailedPutWithoutActionException() throws Exception {
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
+
+ List<Put> puts = new ArrayList<>(3);
+ puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+ // this put should fail
+ puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
+ // this put should fail, and it won't have action exception
+ puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
+ final int expectedSize = puts.size();
+
+ AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
+ arf.waitUntilDone();
+ // There are two failed puts
+ assertError(arf, 2);
+ Object[] result = arf.getResults();
+ assertEquals(expectedSize, result.length);
+ assertEquals(Result.class, result[0].getClass());
+ assertTrue(result[1] instanceof IOException);
+ assertTrue(result[2] instanceof IOException);
+ assertTrue(puts.isEmpty());
+ assertActionsInProgress(arf);
+ }
+
+ private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
+ assertTrue(arf.hasError());
+ RetriesExhaustedWithDetailsException e = arf.getErrors();
+ List<Throwable> errors = e.getCauses();
+ assertEquals(expectedCountOfFailure, errors.size());
+ for (Throwable t : errors) {
+ assertTrue(t instanceof IOException);
+ }
+ }
+
+ private static void assertActionsInProgress(AsyncRequestFuture arf) {
+ if (arf instanceof AsyncRequestFutureImpl) {
+ assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
+ }
+ }
+
+ private static ClusterConnection createHConnection() throws IOException {
+ ClusterConnection hc = Mockito.mock(ClusterConnection.class);
+ NonceGenerator ng = Mockito.mock(NonceGenerator.class);
+ Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
+ Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
+ Mockito.when(hc.getConfiguration()).thenReturn(CONF);
+ Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
+ setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
+ setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
+ Mockito
+ .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
+ .thenReturn(Collections.singletonList(REGION_LOCATION));
+ return hc;
+ }
+
+ private static void setMockLocation(ClusterConnection hc, byte[] row, RegionLocations result)
+ throws IOException {
+ Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
+ Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
+ Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
+ Mockito.anyBoolean())).thenReturn(result);
+ }
+
+ private static class MyAsyncProcess extends AsyncProcess {
+ private final ExecutorService service = Executors.newFixedThreadPool(5);
+
+ MyAsyncProcess(ClusterConnection hc, Configuration conf) {
+ super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+ }
+
+ public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
+ throws InterruptedIOException {
+ return submit(AsyncProcessTask.newBuilder()
+ .setPool(service)
+ .setTableName(tableName)
+ .setRowAccess(rows)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
+ .setNeedResults(true)
+ .setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
+ .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
+ .build());
+ }
+
+ @Override
+ protected RpcRetryingCaller<AbstractResponse> createCaller(
+ CancellableRegionServerCallable callable, int rpcTimeout) {
+ MultiServerCallable callable1 = (MultiServerCallable) callable;
+ MultiResponse mr = new MultiResponse();
+ callable1.getMulti().actions.forEach((regionName, actions) -> {
+ actions.forEach(action -> {
+ if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
+ mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
+ } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
+ mr.add(regionName, action.getOriginalIndex(), IOE);
+ }
+ });
+ });
+ mr.addException(REGION_INFO.getRegionName(), IOE);
+ return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
+ @Override
+ public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
+ int callTimeout) {
+ try {
+ // sleep one second in order for threadpool to start another thread instead of reusing
+ // existing one.
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // pass
+ }
+ return mr;
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d8b999e6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
new file mode 100644
index 0000000..e44a2e9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * The purpose of this test is to make sure the region exception won't corrupt the results
+ * of batch. The prescription is shown below.
+ * 1) honor the action result rather than region exception. If the action have both of true result
+ * and region exception, the action is fine as the exception is caused by other actions
+ * which are in the same region.
+ * 2) honor the action exception rather than region exception. If the action have both of action
+ * exception and region exception, we deal with the action exception only. If we also
+ * handle the region exception for the same action, it will introduce the negative count of
+ * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
+ *
+ * The no-cluster test is in TestAsyncProcessWithRegionException.
+ */
+@Category({ MediumTests.class, ClientTests.class })
+public class TestMalformedCellFromClient {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final byte[] FAMILY = Bytes.toBytes("testFamily");
+ private static final int CELL_SIZE = 100;
+ private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // disable the retry
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @Before
+ public void before() throws Exception {
+ TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+ .setValue(HRegion.HBASE_MAX_CELL_SIZE_KEY, String.valueOf(CELL_SIZE)).build();
+ TEST_UTIL.getConnection().getAdmin().createTable(desc);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
+ TEST_UTIL.deleteTable(htd.getTableName());
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * The purpose of this ut is to check the consistency between the exception and results.
+ * If the RetriesExhaustedWithDetailsException contains the whole batch,
+ * each result should be of IOE. Otherwise, the row operation which is not in the exception
+ * should have a true result.
+ */
+ @Test
+ public void testRegionException() throws InterruptedException, IOException {
+ List<Row> batches = new ArrayList<>();
+ batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
+ // the rm is used to prompt the region exception.
+ // see RSRpcServices#multi
+ RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
+ rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+ batches.add(rm);
+ Object[] results = new Object[batches.size()];
+
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ Throwable exceptionByCaught = null;
+ try {
+ table.batch(batches, results);
+ fail("Where is the exception? We put the malformed cells!!!");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ for (Throwable throwable : e.getCauses()) {
+ assertNotNull(throwable);
+ }
+ assertEquals(1, e.getNumExceptions());
+ exceptionByCaught = e.getCause(0);
+ }
+ for (Object obj : results) {
+ assertNotNull(obj);
+ }
+ assertEquals(Result.class, results[0].getClass());
+ assertEquals(exceptionByCaught.getClass(), results[1].getClass());
+ Result result = table.get(new Get(Bytes.toBytes("good")));
+ assertEquals(1, result.size());
+ Cell cell = result.getColumnLatestCell(FAMILY, null);
+ assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
+ }
+ }
+
+ /**
+ * The purpose of this ut is to check the consistency between the exception and results.
+ * If the RetriesExhaustedWithDetailsException contains the whole batch,
+ * each result should be of IOE. Otherwise, the row operation which is not in the exception
+ * should have a true result.
+ */
+ @Test
+ public void testRegionExceptionByAsync() throws Exception {
+ List<Row> batches = new ArrayList<>();
+ batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]));
+ // the rm is used to prompt the region exception.
+ // see RSRpcServices#multi
+ RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
+ rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+ batches.add(rm);
+ try (AsyncConnection asyncConnection = ConnectionFactory
+ .createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
+ AsyncTable<AdvancedScanResultConsumer> table = asyncConnection.getTable(TABLE_NAME);
+ List<CompletableFuture<AdvancedScanResultConsumer>> results = table.batch(batches);
+ assertEquals(2, results.size());
+ try {
+ results.get(1).get();
+ fail("Where is the exception? We put the malformed cells!!!");
+ } catch (ExecutionException e) {
+ // pass
+ }
+ Result result = table.get(new Get(Bytes.toBytes("good"))).get();
+ assertEquals(1, result.size());
+ Cell cell = result.getColumnLatestCell(FAMILY, null);
+ assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
+ }
+ }
+}