You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2018/02/13 13:55:22 UTC
hbase git commit: HBASE-19876 The exception happening in converting
pb mutation to hbase.mutation messes up the CellScanner
Repository: hbase
Updated Branches:
refs/heads/branch-1.2 ebdfbaf2a -> b59c3151d
HBASE-19876 The exception happening in converting pb mutation to hbase.mutation messes up the CellScanner
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b59c3151
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b59c3151
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b59c3151
Branch: refs/heads/branch-1.2
Commit: b59c3151dfe41926d242c5c581a248e272834aa1
Parents: ebdfbaf
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sat Feb 10 00:19:40 2018 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Tue Feb 13 21:50:43 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/protobuf/RequestConverter.java | 4 +-
.../hbase/regionserver/RSRpcServices.java | 135 +++++++++------
.../client/TestMalformedCellFromClient.java | 173 +++++++++++++++++++
3 files changed, 259 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b59c3151/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 295d78c..67a9179 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -465,7 +465,7 @@ public final class RequestConverter {
return regionActionBuilder;
}
- private static RegionAction.Builder getRegionActionBuilderWithRegion(
+ public static RegionAction.Builder getRegionActionBuilderWithRegion(
final RegionAction.Builder regionActionBuilder, final byte [] regionName) {
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
regionActionBuilder.setRegion(region);
@@ -1050,7 +1050,7 @@ public final class RequestConverter {
* @return a Condition
* @throws IOException
*/
- private static Condition buildCondition(final byte[] row,
+ public static Condition buildCondition(final byte[] row,
final byte[] family, final byte [] qualifier,
final ByteArrayComparable comparator,
final CompareType compareType) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b59c3151/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 914467c..4be1234 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -421,32 +421,45 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private ClientProtos.RegionLoadStats mutateRows(final Region region,
final List<ClientProtos.Action> actions,
final CellScanner cellScanner) throws IOException {
- if (!region.getRegionInfo().isMetaTable()) {
- regionServer.cacheFlusher.reclaimMemStoreMemory();
- }
- RowMutations rm = null;
- for (ClientProtos.Action action: actions) {
- if (action.hasGet()) {
- throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
- action.getGet());
+ int countOfCompleteMutation = 0;
+ try {
+ if (!region.getRegionInfo().isMetaTable()) {
+ regionServer.cacheFlusher.reclaimMemStoreMemory();
}
- MutationType type = action.getMutation().getMutateType();
- if (rm == null) {
- rm = new RowMutations(action.getMutation().getRow().toByteArray());
+ RowMutations rm = null;
+ for (ClientProtos.Action action: actions) {
+ if (action.hasGet()) {
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
+ action.getGet());
+ }
+ MutationType type = action.getMutation().getMutateType();
+ if (rm == null) {
+ rm = new RowMutations(action.getMutation().getRow().toByteArray());
+ }
+ switch (type) {
+ case PUT:
+ Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+ ++countOfCompleteMutation;
+ rm.add(put);
+ break;
+ case DELETE:
+ Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
+ ++countOfCompleteMutation;
+ rm.add(delete);
+ break;
+ default:
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+ }
}
- switch (type) {
- case PUT:
- rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
- break;
- case DELETE:
- rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
- break;
- default:
- throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+ region.mutateRow(rm);
+ return ((HRegion)region).getRegionStats();
+ } finally {
+ // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
+ // even if the malformed cells are not skipped.
+ for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
+ skipCellsForMutation(actions.get(i), cellScanner);
}
}
- region.mutateRow(rm);
- return ((HRegion)region).getRegionStats();
}
/**
@@ -464,31 +477,44 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
- if (!region.getRegionInfo().isMetaTable()) {
- regionServer.cacheFlusher.reclaimMemStoreMemory();
- }
- RowMutations rm = null;
- for (ClientProtos.Action action: actions) {
- if (action.hasGet()) {
- throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
- action.getGet());
+ int countOfCompleteMutation = 0;
+ try {
+ if (!region.getRegionInfo().isMetaTable()) {
+ regionServer.cacheFlusher.reclaimMemStoreMemory();
}
- MutationType type = action.getMutation().getMutateType();
- if (rm == null) {
- rm = new RowMutations(action.getMutation().getRow().toByteArray());
+ RowMutations rm = null;
+ for (ClientProtos.Action action: actions) {
+ if (action.hasGet()) {
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
+ action.getGet());
+ }
+ MutationType type = action.getMutation().getMutateType();
+ if (rm == null) {
+ rm = new RowMutations(action.getMutation().getRow().toByteArray());
+ }
+ switch (type) {
+ case PUT:
+ Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
+ ++countOfCompleteMutation;
+ rm.add(put);
+ break;
+ case DELETE:
+ Delete delete = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
+ ++countOfCompleteMutation;
+ rm.add(delete);
+ break;
+ default:
+ throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+ }
}
- switch (type) {
- case PUT:
- rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
- break;
- case DELETE:
- rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
- break;
- default:
- throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
+ return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, true);
+ } finally {
+ // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
+ // even if the malformed cells are not skipped.
+ for (int i = countOfCompleteMutation; i < actions.size(); ++i) {
+ skipCellsForMutation(actions.get(i), cellScanner);
}
}
- return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
}
/**
@@ -624,9 +650,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
setException(ResponseConverter.buildException(sizeIOE));
resultOrExceptionBuilder.setIndex(action.getIndex());
builder.addResultOrException(resultOrExceptionBuilder.build());
- if (cellScanner != null) {
- skipCellsForMutation(action, cellScanner);
- }
+ skipCellsForMutation(action, cellScanner);
continue;
}
if (action.hasGet()) {
@@ -784,8 +808,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
} catch (IOException ie) {
- for (int i = 0; i < mutations.size(); i++) {
- builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
+ int processedMutationIndex = 0;
+ for (Action mutation : mutations) {
+ // The non-null mArray[i] means the cell scanner has been read.
+ if (mArray[processedMutationIndex++] == null) {
+ skipCellsForMutation(mutation, cells);
+ }
+ builder.addResultOrException(getResultOrException(ie, mutation.getIndex()));
}
}
if (regionServer.metricsRegionServer != null) {
@@ -2129,9 +2158,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// All Mutations in this RegionAction not executed as we can not see the Region online here
// in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
// corresponding to these Mutations.
- if (cellScanner != null) {
- skipCellsForMutations(regionAction.getActionList(), cellScanner);
- }
+ skipCellsForMutations(regionAction.getActionList(), cellScanner);
continue; // For this region it's a failure.
}
@@ -2183,12 +2210,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
+ if (cellScanner == null) {
+ return;
+ }
for (Action action : actions) {
skipCellsForMutation(action, cellScanner);
}
}
private void skipCellsForMutation(Action action, CellScanner cellScanner) {
+ if (cellScanner == null) {
+ return;
+ }
try {
if (action.hasMutation()) {
MutationProto m = action.getMutation();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b59c3151/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
new file mode 100644
index 0000000..892b4d4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * The purpose of this test is to ensure whether rs deals with the malformed cells correctly.
+ */
+@Category({ MediumTests.class, ClientTests.class })
+public class TestMalformedCellFromClient {
+ private static final Log LOG = LogFactory.getLog(TestMalformedCellFromClient.class);
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final byte[] FAMILY = Bytes.toBytes("testFamily");
+ private static final TableName TABLE_NAME = TableName.valueOf("TestMalformedCellFromClient");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // disable the retry
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @Before
+ public void before() throws Exception {
+ HTableDescriptor desc =
+ new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(FAMILY));
+ TEST_UTIL.getConnection().getAdmin().createTable(desc);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (HTableDescriptor htd : TEST_UTIL.getHBaseAdmin().listTables()) {
+ TEST_UTIL.deleteTable(htd.getTableName());
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * The invalid cells is in rm. The rm should fail but the subsequent mutations should succeed.
+ * Currently, we have no client api to submit the request consisting of condition-rm and mutation.
+ * Hence, this test build the request manually.
+ */
+ @Test
+ public void testAtomicOperations() throws Exception {
+ RowMutations rm = new RowMutations(Bytes.toBytes("fail"));
+ rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
+ rm.add(new Put(rm.getRow()).addColumn(FAMILY, null, new byte[10]));
+ Put put = new Put(Bytes.toBytes("good")).addColumn(FAMILY, null, new byte[10]);
+
+ // build the request
+ HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ ClientProtos.MultiRequest request =
+ ClientProtos.MultiRequest.newBuilder(createRequest(rm, r.getRegionInfo().getRegionName()))
+ .addRegionAction(ClientProtos.RegionAction.newBuilder().setRegion(RequestConverter
+ .buildRegionSpecifier(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME,
+ r.getRegionInfo().getRegionName())).addAction(ClientProtos.Action.newBuilder()
+ .setMutation(
+ ProtobufUtil.toMutationNoData(ClientProtos.MutationProto.MutationType.PUT, put))))
+ .build();
+
+ List<Cell> cells = new ArrayList<>();
+ for (Mutation m : rm.getMutations()) {
+ cells.addAll(m.getCellList(FAMILY));
+ }
+ cells.addAll(put.getCellList(FAMILY));
+ assertEquals(3, cells.size());
+ PayloadCarryingRpcController controller = Mockito.mock(PayloadCarryingRpcController.class);
+ Mockito.when(controller.cellScanner()).thenReturn(CellUtil.createCellScanner(cells));
+ HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+ ClientProtos.MultiResponse response = rs.getRSRpcServices().multi(controller, request);
+ assertEquals(2, response.getRegionActionResultCount());
+ assertTrue(response.getRegionActionResultList().get(0).hasException());
+ assertFalse(response.getRegionActionResultList().get(1).hasException());
+ assertEquals(1, response.getRegionActionResultList().get(1).getResultOrExceptionCount());
+ assertTrue(
+ response.getRegionActionResultList().get(1).getResultOrExceptionList().get(0).hasResult());
+ try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) {
+ Result result = table.get(new Get(Bytes.toBytes("good")));
+ assertEquals(1, result.size());
+ Cell cell = result.getColumnLatestCell(FAMILY, null);
+ assertTrue(Bytes.equals(CellUtil.cloneValue(cell), new byte[10]));
+ }
+ }
+
+ private static ClientProtos.MultiRequest createRequest(RowMutations rm, byte[] regionName)
+ throws IOException {
+ ClientProtos.RegionAction.Builder builder = RequestConverter
+ .getRegionActionBuilderWithRegion(ClientProtos.RegionAction.newBuilder(), regionName);
+ builder.setAtomic(true);
+ ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
+ ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
+ ClientProtos.Condition condition = RequestConverter
+ .buildCondition(rm.getRow(), FAMILY, FAMILY, new BinaryComparator(new byte[10]),
+ HBaseProtos.CompareType.EQUAL);
+ for (Mutation mutation : rm.getMutations()) {
+ ClientProtos.MutationProto.MutationType mutateType = null;
+ if (mutation instanceof Put) {
+ mutateType = ClientProtos.MutationProto.MutationType.PUT;
+ } else if (mutation instanceof Delete) {
+ mutateType = ClientProtos.MutationProto.MutationType.DELETE;
+ } else {
+ throw new DoNotRetryIOException(
+ "RowMutations supports only put and delete, not " + mutation.getClass().getName());
+ }
+ mutationBuilder.clear();
+ ClientProtos.MutationProto mp =
+ ProtobufUtil.toMutationNoData(mutateType, mutation, mutationBuilder);
+ actionBuilder.clear();
+ actionBuilder.setMutation(mp);
+ // add a get to fail the rm
+ actionBuilder.setGet(ProtobufUtil.toGet(new Get(rm.getRow())));
+ builder.addAction(actionBuilder.build());
+ }
+ ClientProtos.MultiRequest request =
+ ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
+ .setCondition(condition).build();
+ return request;
+ }
+}
\ No newline at end of file