You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/06/08 01:57:33 UTC
[hbase] branch branch-2.3 updated: HBASE-24515 batch
Increment/Append fails when retrying the RPC
This is an automated email from the ASF dual-hosted git repository.
brfrn169 pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new a30d3e1 HBASE-24515 batch Increment/Append fails when retrying the RPC
a30d3e1 is described below
commit a30d3e1a55fe866a1d81c782a2eff0490225d079
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Mon Jun 8 09:51:21 2020 +0900
HBASE-24515 batch Increment/Append fails when retrying the RPC
Signed-off-by: Viraj Jasani <vi...@gmail.com>
---
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 54 ----------------------
.../hadoop/hbase/regionserver/RSRpcServices.java | 31 +++++++++++--
.../hadoop/hbase/client/TestFromClientSide.java | 41 ++++++++++++++++
.../hbase/client/TestIncrementsFromClientSide.java | 41 ++++++++++++++++
4 files changed, 109 insertions(+), 58 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 26666c6..09db446 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -925,60 +925,6 @@ public final class ProtobufUtil {
throw new IOException("Unknown mutation type " + type);
}
- /**
- * Convert a protocol buffer Mutate to a Get.
- * @param proto the protocol buffer Mutate to convert.
- * @param cellScanner
- * @return the converted client get.
- * @throws IOException
- */
- public static Get toGet(final MutationProto proto, final CellScanner cellScanner)
- throws IOException {
- MutationType type = proto.getMutateType();
- assert type == MutationType.INCREMENT || type == MutationType.APPEND : type.name();
- byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
- Get get = null;
- int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0;
- if (cellCount > 0) {
- // The proto has metadata only and the data is separate to be found in the cellScanner.
- if (cellScanner == null) {
- throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: "
- + TextFormat.shortDebugString(proto));
- }
- for (int i = 0; i < cellCount; i++) {
- if (!cellScanner.advance()) {
- throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i
- + " no cell returned: " + TextFormat.shortDebugString(proto));
- }
- Cell cell = cellScanner.current();
- if (get == null) {
- get = new Get(CellUtil.cloneRow(cell));
- }
- get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
- }
- } else {
- get = new Get(row);
- for (ColumnValue column : proto.getColumnValueList()) {
- byte[] family = column.getFamily().toByteArray();
- for (QualifierValue qv : column.getQualifierValueList()) {
- byte[] qualifier = qv.getQualifier().toByteArray();
- if (!qv.hasValue()) {
- throw new DoNotRetryIOException("Missing required field: qualifier value");
- }
- get.addColumn(family, qualifier);
- }
- }
- }
- if (proto.hasTimeRange()) {
- TimeRange timeRange = toTimeRange(proto.getTimeRange());
- get.setTimeRange(timeRange.getMin(), timeRange.getMax());
- }
- for (NameBytesPair attribute : proto.getAttributeList()) {
- get.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
- }
- return get;
- }
-
public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
switch (readType) {
case DEFAULT:
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 8f57a37..8cab3fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -720,8 +720,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
r = region.append(append, nonceGroup, nonce);
} else {
// convert duplicate append to get
- List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
- nonceGroup, nonce);
+ List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce);
r = Result.create(results);
}
success = true;
@@ -767,8 +766,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
r = region.increment(increment, nonceGroup, nonce);
} else {
// convert duplicate increment to get
- List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
- nonce);
+ List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce);
r = Result.create(results);
}
success = true;
@@ -790,6 +788,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return r == null ? Result.EMPTY_RESULT : r;
}
+ private static Get toGet(final Mutation mutation) throws IOException {
+ if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
+ throw new AssertionError("mutation must be a instance of Increment or Append");
+ }
+ Get get = new Get(mutation.getRow());
+ CellScanner cellScanner = mutation.cellScanner();
+ while (!cellScanner.advance()) {
+ Cell cell = cellScanner.current();
+ get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
+ }
+ if (mutation instanceof Increment) {
+ // Increment
+ Increment increment = (Increment) mutation;
+ get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
+ } else {
+ // Append
+ Append append = (Append) mutation;
+ get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
+ }
+ for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+ get.setAttribute(entry.getKey(), entry.getValue());
+ }
+ return get;
+ }
+
/**
* Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index ac02e9b..0c9654d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -147,6 +148,46 @@ public class TestFromClientSide extends FromClientSideBase {
}
/**
+ * Test batch append result when there are duplicate rpc request.
+ */
+ @Test
+ public void testDuplicateBatchAppend() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getTableName());
+ Map<String, String> kvs = new HashMap<>();
+ kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
+ hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1,
+ kvs);
+ TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+ // Client will retry because rpc timeout is small than the sleep time of first rpc call
+ c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+ try (Connection connection = ConnectionFactory.createConnection(c);
+ Table table = connection.getTableBuilder(name.getTableName(), null).
+ setOperationTimeout(3 * 1000).build()) {
+ Append append = new Append(ROW);
+ append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+
+ // Batch append
+ Object[] results = new Object[1];
+ table.batch(Collections.singletonList(append), results);
+
+ // Verify expected result
+ Cell[] cells = ((Result) results[0]).rawCells();
+ assertEquals(1, cells.length);
+ assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+
+ // Verify expected result again
+ Result readResult = table.get(new Get(ROW));
+ cells = readResult.rawCells();
+ assertEquals(1, cells.length);
+ assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+ }
+ }
+
+ /**
* Basic client side validation of HBASE-4536
*/
@Test public void testKeepDeletedCells() throws Exception {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
index 90c7e27..6c230df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -138,6 +139,46 @@ public class TestIncrementsFromClientSide {
}
}
+ /**
+ * Test batch increment result when there are duplicate rpc request.
+ */
+ @Test
+ public void testDuplicateBatchIncrement() throws Exception {
+ HTableDescriptor hdt =
+ TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()));
+ Map<String, String> kvs = new HashMap<>();
+ kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
+ hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1,
+ kvs);
+ TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+ // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
+ c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+ try (Connection connection = ConnectionFactory.createConnection(c);
+ Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null)
+ .setOperationTimeout(3 * 1000).build()) {
+ Increment inc = new Increment(ROW);
+ inc.addColumn(HBaseTestingUtility.fam1, QUALIFIER, 1);
+
+ // Batch increment
+ Object[] results = new Object[1];
+ table.batch(Collections.singletonList(inc), results);
+
+ Cell[] cells = ((Result) results[0]).rawCells();
+ assertEquals(1, cells.length);
+ assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
+
+ // Verify expected result
+ Result readResult = table.get(new Get(ROW));
+ cells = readResult.rawCells();
+ assertEquals(1, cells.length);
+ assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
+ }
+ }
+
@Test
public void testIncrementWithDeletes() throws Exception {
LOG.info("Starting " + this.name.getMethodName());