You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/02/15 04:33:37 UTC
[11/30] hbase git commit: HBASE-19876 The exception happening in
converting pb mutation to hbase.mutation messes up the CellScanner
HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2f48fdbb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2f48fdbb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2f48fdbb
Branch: refs/heads/HBASE-19064
Commit: 2f48fdbb26ff555485b4aa3393d835b7dd8797a0
Parents: 16f1f5b
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sun Feb 11 03:49:53 2018 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Tue Feb 13 21:08:59 2018 +0800
----------------------------------------------------------------------
.../hbase/shaded/protobuf/RequestConverter.java | 4 +-
.../hbase/regionserver/RSRpcServices.java | 138 +++++++------
.../client/TestMalformedCellFromClient.java | 203 +++++++++++++++++--
3 files changed, 262 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2f48fdbb/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 8ac7058..0afcfe1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -473,7 +473,7 @@ public final class RequestConverter {
return regionActionBuilder;
}
- private static RegionAction.Builder getRegionActionBuilderWithRegion(
+ public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
regionActionBuilder.setRegion(region);
@@ -1099,7 +1099,7 @@ public final class RequestConverter {
* @return a Condition
* @throws IOException
*/
- private static Condition buildCondition(final byte[] row, final byte[] family,
+ public static Condition buildCondition(final byte[] row, final byte[] family,
final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType)
throws IOException {
Condition.Builder builder = Condition.newBuilder();
http://git-wip-us.apache.org/repos/asf/hbase/blob/2f48fdbb/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 44934a6..5b4e3b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -560,67 +560,60 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* Mutate a list of rows atomically.
* @param cellScanner if non-null, the mutation data -- the Cell content.
*/
- private void mutateRows(final HRegion region, final OperationQuota quota,
- final List<ClientProtos.Action> actions, final CellScanner cellScanner,
- RegionActionResult.Builder builder, final ActivePolicyEnforcement spaceQuotaEnforcement)
- throws IOException {
- for (ClientProtos.Action action: actions) {
- if (action.hasGet()) {
- throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
- action.getGet());
- }
- }
- doBatchOp(builder, region, quota, actions, cellScanner, spaceQuotaEnforcement, true);
- }
-
- /**
- * Mutate a list of rows atomically.
- * @param cellScanner if non-null, the mutation data -- the Cell content.
- */
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder,
ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
- if (!region.getRegionInfo().isMetaRegion()) {
- regionServer.cacheFlusher.reclaimMemStoreMemory();
- }
- RowMutations rm = null;
- int i = 0;
- ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
+ int countOfCompleteMutation = 0;
+ try {
+ if (!region.getRegionInfo().isMetaRegion()) {
+ regionServer.cacheFlusher.reclaimMemStoreMemory();
+ }
+ RowMutations rm = null;
+ int i = 0;
+ ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
- for (ClientProtos.Action action: actions) {
- if (action.hasGet()) {
- throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
+ for (ClientProtos.Action action: actions) {
+ if (action.hasGet()) {
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
action.getGet());
+ }
+ MutationType type = action.getMutation().getMutateType();
+ if (rm == null) {
+ rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
+ }
+ switch (type) {
+ case PUT:
+ Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+ ++countOfCompleteMutation;
+ checkCellSizeLimit(region, put);
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+ rm.add(put);
+ break;
+ case DELETE:
+ Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
+ ++countOfCompleteMutation;
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
+ rm.add(del);
+ break;
+ default:
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+ }
+ // To unify the response format with doNonAtomicRegionMutation and read through client's
+ // AsyncProcess we have to add an empty result instance per operation
+ resultOrExceptionOrBuilder.clear();
+ resultOrExceptionOrBuilder.setIndex(i++);
+ builder.addResultOrException(
+ resultOrExceptionOrBuilder.build());
}
- MutationType type = action.getMutation().getMutateType();
- if (rm == null) {
- rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size());
- }
- switch (type) {
- case PUT:
- Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
- checkCellSizeLimit(region, put);
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
- rm.add(put);
- break;
- case DELETE:
- Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
- rm.add(del);
- break;
- default:
- throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+ return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm);
+ } finally {
+ // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
+ // even if the malformed cells are not skipped.
+ for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
+ skipCellsForMutation(actions.get(i), cellScanner);
}
- // To unify the response format with doNonAtomicRegionMutation and read through client's
- // AsyncProcess we have to add an empty result instance per operation
- resultOrExceptionOrBuilder.clear();
- resultOrExceptionOrBuilder.setIndex(i++);
- builder.addResultOrException(
- resultOrExceptionOrBuilder.build());
}
- return region.checkAndRowMutate(row, family, qualifier, op,
- comparator, rm);
}
/**
@@ -787,9 +780,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
context.incrementResponseExceptionSize(pair.getSerializedSize());
resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build());
- if (cellScanner != null) {
- skipCellsForMutation(action, cellScanner);
- }
+ skipCellsForMutation(action, cellScanner);
continue;
}
if (action.hasGet()) {
@@ -896,6 +887,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement, false);
} catch (IOException ioe) {
+ // TODO do the refactor to avoid this catch as it is useless
+ // doBatchOp has handled the IOE for all non-atomic operations.
rpcServer.getMetrics().exception(ioe);
NameBytesPair pair = ResponseConverter.buildException(ioe);
resultOrExceptionBuilder.setException(pair);
@@ -947,6 +940,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<>();
int i = 0;
for (ClientProtos.Action action: mutations) {
+ if (action.hasGet()) {
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
+ action.getGet());
+ }
MutationProto m = action.getMutation();
Mutation mutation;
if (m.getMutateType() == MutationType.PUT) {
@@ -969,8 +966,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// HBASE-17924
- // Sort to improve lock efficiency for non-atomic batch of operations. If atomic (mostly
- // called from mutateRows()), order is preserved as its expected from the client
+ // Sort to improve lock efficiency for non-atomic batch of operations. If atomic
+ // order is preserved as its expected from the client
if (!atomic) {
Arrays.sort(mArray, (v1, v2) -> Row.COMPARATOR.compare(v1, v2));
}
@@ -1005,12 +1002,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
} catch (IOException ie) {
+ int processedMutationIndex = 0;
+ for (Action mutation : mutations) {
+ // The non-null mArray[i] means the cell scanner has been read.
+ if (mArray[processedMutationIndex++] == null) {
+ skipCellsForMutation(mutation, cells);
+ }
+ if (!atomic) {
+ builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
+ }
+ }
if (atomic) {
throw ie;
}
- for (Action mutation : mutations) {
- builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
- }
}
if (regionServer.metricsRegionServer != null) {
long after = EnvironmentEdgeManager.currentTime();
@@ -2550,9 +2554,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations.
- if (cellScanner != null) {
- skipCellsForMutations(regionAction.getActionList(), cellScanner);
- }
+ skipCellsForMutations(regionAction.getActionList(), cellScanner);
continue; // For this region it's a failure.
}
@@ -2573,8 +2575,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
cellScanner, row, family, qualifier, op,
comparator, regionActionResultBuilder, spaceQuotaEnforcement);
} else {
- mutateRows(region, quota, regionAction.getActionList(), cellScanner,
- regionActionResultBuilder, spaceQuotaEnforcement);
+ doBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
+ cellScanner, spaceQuotaEnforcement, true);
processed = Boolean.TRUE;
}
} catch (IOException e) {
@@ -2621,12 +2623,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
+ if (cellScanner == null) {
+ return;
+ }
for (Action action : actions) {
skipCellsForMutation(action, cellScanner);
}
}
private void skipCellsForMutation(Action action, CellScanner cellScanner) {
+ if (cellScanner == null) {
+ return;
+ }
try {
if (action.hasMutation()) {
MutationProto m = action.getMutation();
http://git-wip-us.apache.org/repos/asf/hbase/blob/2f48fdbb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index e44a2e9..6b57b89 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -29,11 +30,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -44,23 +49,21 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/**
- * The purpose of this test is to make sure the region exception won't corrupt the results
- * of batch. The prescription is shown below.
- * 1) honor the action result rather than region exception. If the action have both of true result
- * and region exception, the action is fine as the exception is caused by other actions
- * which are in the same region.
- * 2) honor the action exception rather than region exception. If the action have both of action
- * exception and region exception, we deal with the action exception only. If we also
- * handle the region exception for the same action, it will introduce the negative count of
- * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
- *
- * The no-cluster test is in TestAsyncProcessWithRegionException.
+ * The purpose of this test is to ensure whether rs deals with the malformed cells correctly.
*/
@Category({ MediumTests.class, ClientTests.class })
public class TestMalformedCellFromClient {
-
+ private static final Logger LOG = LoggerFactory.getLogger(TestMalformedCellFromClient.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMalformedCellFromClient.class);
@@ -139,10 +142,16 @@ public class TestMalformedCellFromClient {
}
/**
- * The purpose of this ut is to check the consistency between the exception and results.
- * If the RetriesExhaustedWithDetailsException contains the whole batch,
- * each result should be of IOE. Otherwise, the row operation which is not in the exception
- * should have a true result.
+ * This test verifies region exception doesn't corrupt the results of batch. The prescription is
+ * shown below. 1) honor the action result rather than region exception. If the action have both
+ * of true result and region exception, the action is fine as the exception is caused by other
+ * actions which are in the same region. 2) honor the action exception rather than region
+ * exception. If the action have both of action exception and region exception, we deal with the
+ * action exception only. If we also handle the region exception for the same action, it will
+ * introduce the negative count of actions in progress. The AsyncRequestFuture#waitUntilDone will
+ * block forever. If the RetriesExhaustedWithDetailsException contains the whole batch, each
+ * result should be of IOE. Otherwise, the row operation which is not in the exception should have
+ * a true result. The no-cluster test is in TestAsyncProcessWithRegionException.
*/
@Test
public void testRegionExceptionByAsync() throws Exception {
@@ -170,4 +179,166 @@ public class TestMalformedCellFromClient {
assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
}
}
+
+ /**
+ * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed.
+ * Currently, we have no client api to submit the request consisting of condition-rm and mutation.
+ * Hence, this test build the request manually.
+ */
+ @Test
+ public void testAtomicOperations() throws Exception {
+ RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
+ rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+ rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
+ Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]);
+
+ // build the request
+ HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ ClientProtos.MultiRequest request =
+ ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName()))
+ .addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter
+ .buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
+ r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder()
+ .setMutation(
+ ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put))))
+ .build();
+
+ List<Cell> cells = new ArrayList<>();
+ for (Mutation m : rm.getMutations()) {
+ cells.addAll(m.getCellList(FAMILY));
+ }
+ cells.addAll(put.getCellList(FAMILY));
+ assertEquals(3, cells.size());
+ HBaseRpcController controller = Mockito.mock(HBaseRpcController.class);
+ Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells));
+ HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(
+ TEST_UTIL.getMiniHBaseCluster()
+ .getServerHoldingRegion(TABLE_NAME, r.getRegionInfo().getRegionName()));
+
+ ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request);
+ assertEquals(2, response.getRegionActionResultCount());
+ assertTrue(response.getRegionActionResultList().get(0).hasException());
+ assertFalse(response.getRegionActionResultList().get(1).hasException());
+ assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount());
+ assertTrue(
+ response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ Result result = table.get(new Get(Bytes.toBytes("good")));
+ assertEquals(1, result.size());
+ Cell cell = result.getColumnLatestCell(FAMILY, null);
+ assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
+ }
+ }
+
+ private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName)
+ throws IOException {
+ ClientProtos.RegionAction.Builder builder = RequestConverter
+ .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName);
+ builder.setAtomic(true);
+ ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+ ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
+ ClientProtos.Condition condition = RequestConverter
+ .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
+ HBaseProtos.CompareType.EQUAL);
+ for (Mutation mutation : rm.getMutations()) {
+ ClientProtos.MutationProto.MutationType mutateType = null;
+ if (mutation instanceof Put) {
+ mutateType = ClientProtos.MutationProto.MutationType.PUT;
+ } else if (mutation instanceof Delete) {
+ mutateType = ClientProtos.MutationProto.MutationType.DELETE;
+ } else {
+ throw new DoNotRetryIOException(
+ "RowMutations supports only put and delete, not " + mutation.getClass().getName());
+ }
+ mutationBuilder.clear();
+ ClientProtos.MutationProto mp =
+ ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
+ actionBuilder.clear();
+ actionBuilder.setMutation(mp);
+ builder.addAction(actionBuilder.build());
+ }
+ ClientProtos.MultiRequest request =
+ ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
+ .setCondition(condition).build();
+ return request;
+ }
+
+ /**
+ * This test depends on how regionserver process the batch ops.
+ * 1) group the put/delete until meeting the increment
+ * 2) process the batch of put/delete
+ * 3) process the increment
+ * see RSRpcServices#doNonAtomicRegionMutation
+ */
+ @Test
+ public void testNonAtomicOperations() throws InterruptedException, IOException {
+ Increment inc = new Increment(Bytes.toBytes("good")).addColumn(FAMILY, null, 100);
+ List<Row> batches = new ArrayList<>();
+ // the first and second puts will be group by regionserver
+ batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+ batches.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+ // this Increment should succeed
+ batches.add(inc);
+ // this put should succeed
+ batches.add(new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]));
+ Object[] objs = new Object[batches.size()];
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ table.batch(batches, objs);
+ fail("Where is the exception? We put the malformed cells!!!");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ assertEquals(2, e.getNumExceptions());
+ for (int i = 0; i != e.getNumExceptions(); ++i) {
+ assertNotNull(e.getCause(i));
+ assertEquals(DoNotRetryIOException.class, e.getCause(i).getClass());
+ assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
+ }
+ } finally {
+ assertObjects(objs, batches.size());
+ assertTrue(objs[0] instanceof IOException);
+ assertTrue(objs[1] instanceof IOException);
+ assertEquals(Result.class, objs[2].getClass());
+ assertEquals(Result.class, objs[3].getClass());
+ }
+ }
+
+ @Test
+ public void testRowMutations() throws InterruptedException, IOException {
+ Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[1]);
+ List<Row> batches = new ArrayList<>();
+ RowMutations mutations = new RowMutations(Bytes.toBytes("fail"));
+ // the first and second puts will be group by regionserver
+ mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+ mutations.add(new Put(Bytes.toBytes("fail")).addColumn(FAMILY, null, new byte[CELL_SIZE]));
+ batches.add(mutations);
+ // this bm should succeed
+ mutations = new RowMutations(Bytes.toBytes("good"));
+ mutations.add(put);
+ mutations.add(put);
+ batches.add(mutations);
+ Object[] objs = new Object[batches.size()];
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ table.batch(batches, objs);
+ fail("Where is the exception? We put the malformed cells!!!");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ assertEquals(1, e.getNumExceptions());
+ for (int i = 0; i != e.getNumExceptions(); ++i) {
+ assertNotNull(e.getCause(i));
+ assertTrue(e.getCause(i) instanceof IOException);
+ assertEquals("fail", Bytes.toString(e.getRow(i).getRow()));
+ }
+ } finally {
+ assertObjects(objs, batches.size());
+ assertTrue(objs[0] instanceof IOException);
+ assertEquals(Result.class, objs[1].getClass());
+ }
+ }
+
+ private static void assertObjects(Object[] objs, int expectedSize) {
+ int count = 0;
+ for (Object obj : objs) {
+ assertNotNull(obj);
+ ++count;
+ }
+ assertEquals(expectedSize, count);
+ }
}