You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/06/08 00:51:34 UTC

[hbase] branch master updated: HBASE-24515 batch Increment/Append fails when retrying the RPC (#1864)

This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 022dd96  HBASE-24515 batch Increment/Append fails when retrying the RPC (#1864)
022dd96 is described below

commit 022dd9687f66bd37b0f597393f018dcae7679dfc
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Mon Jun 8 09:51:21 2020 +0900

    HBASE-24515 batch Increment/Append fails when retrying the RPC (#1864)
    
    Signed-off-by: Viraj Jasani <vi...@gmail.com>
---
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 54 ------------------
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 31 +++++++++--
 .../hadoop/hbase/client/TestFromClientSide.java    | 64 +++++++++++++++++++---
 .../hbase/client/TestIncrementsFromClientSide.java | 56 +++++++++++++++++--
 4 files changed, 136 insertions(+), 69 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index fcdd18b..4de8778 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -970,60 +970,6 @@ public final class ProtobufUtil {
     throw new IOException("Unknown mutation type " + type);
   }
 
-  /**
-   * Convert a protocol buffer Mutate to a Get.
-   * @param proto the protocol buffer Mutate to convert.
-   * @param cellScanner
-   * @return the converted client get.
-   * @throws IOException
-   */
-  public static Get toGet(final MutationProto proto, final CellScanner cellScanner)
-      throws IOException {
-    MutationType type = proto.getMutateType();
-    assert type == MutationType.INCREMENT || type == MutationType.APPEND : type.name();
-    byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
-    Get get = null;
-    int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0;
-    if (cellCount > 0) {
-      // The proto has metadata only and the data is separate to be found in the cellScanner.
-      if (cellScanner == null) {
-        throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: "
-            + TextFormat.shortDebugString(proto));
-      }
-      for (int i = 0; i < cellCount; i++) {
-        if (!cellScanner.advance()) {
-          throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i
-              + " no cell returned: " + TextFormat.shortDebugString(proto));
-        }
-        Cell cell = cellScanner.current();
-        if (get == null) {
-          get = new Get(CellUtil.cloneRow(cell));
-        }
-        get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
-      }
-    } else {
-      get = new Get(row);
-      for (ColumnValue column : proto.getColumnValueList()) {
-        byte[] family = column.getFamily().toByteArray();
-        for (QualifierValue qv : column.getQualifierValueList()) {
-          byte[] qualifier = qv.getQualifier().toByteArray();
-          if (!qv.hasValue()) {
-            throw new DoNotRetryIOException("Missing required field: qualifier value");
-          }
-          get.addColumn(family, qualifier);
-        }
-      }
-    }
-    if (proto.hasTimeRange()) {
-      TimeRange timeRange = toTimeRange(proto.getTimeRange());
-      get.setTimeRange(timeRange.getMin(), timeRange.getMax());
-    }
-    for (NameBytesPair attribute : proto.getAttributeList()) {
-      get.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
-    }
-    return get;
-  }
-
   public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
     switch (readType) {
       case DEFAULT:
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index cdb34fd..1890a4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -724,8 +724,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           r = region.append(append, nonceGroup, nonce);
         } else {
           // convert duplicate append to get
-          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
-              nonceGroup, nonce);
+          List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce);
           r = Result.create(results);
         }
         success = true;
@@ -771,8 +770,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           r = region.increment(increment, nonceGroup, nonce);
         } else {
           // convert duplicate increment to get
-          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
-              nonce);
+          List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce);
           r = Result.create(results);
         }
         success = true;
@@ -794,6 +792,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return r == null ? Result.EMPTY_RESULT : r;
   }
 
+  private static Get toGet(final Mutation mutation) throws IOException {
+    if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
+      throw new AssertionError("mutation must be a instance of Increment or Append");
+    }
+    Get get = new Get(mutation.getRow());
+    CellScanner cellScanner = mutation.cellScanner();
+    while (!cellScanner.advance()) {
+      Cell cell = cellScanner.current();
+      get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
+    }
+    if (mutation instanceof Increment) {
+      // Increment
+      Increment increment = (Increment) mutation;
+      get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
+    } else {
+      // Append
+      Append append = (Append) mutation;
+      get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
+    }
+    for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+      get.setAttribute(entry.getKey(), entry.getValue());
+    }
+    return get;
+  }
+
   /**
    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 475993d..70d832f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,10 +39,8 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -114,13 +113,18 @@ public class TestFromClientSide extends FromClientSideBase {
    */
   @Test
   public void testDuplicateAppend() throws Exception {
-    HTableDescriptor hdt = TEST_UTIL
-      .createTableDescriptor(name.getTableName(), HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3,
-        HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
+    TableDescriptorBuilder.ModifyableTableDescriptor mtd = TEST_UTIL
+      .createModifyableTableDescriptor(name.getTableName(),
+        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
     Map<String, String> kvs = new HashMap<>();
     kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
-    hdt.addCoprocessor(SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
-    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+    mtd.setCoprocessor(CoprocessorDescriptorBuilder
+      .newBuilder(SleepAtFirstRpcCall.class.getName())
+      .setPriority(1)
+      .setProperties(kvs)
+      .build());
+    TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
 
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
     c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
@@ -148,6 +152,52 @@ public class TestFromClientSide extends FromClientSideBase {
   }
 
   /**
+   * Test batch append result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateBatchAppend() throws Exception {
+    TableDescriptorBuilder.ModifyableTableDescriptor mtd = TEST_UTIL
+      .createModifyableTableDescriptor(name.getTableName(),
+        ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+        ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
+    Map<String, String> kvs = new HashMap<>();
+    kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
+    mtd.setCoprocessor(CoprocessorDescriptorBuilder
+      .newBuilder(SleepAtFirstRpcCall.class.getName())
+      .setPriority(1)
+      .setProperties(kvs)
+      .build());
+    TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry because rpc timeout is small than the sleep time of first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    try (Connection connection = ConnectionFactory.createConnection(c);
+      Table table = connection.getTableBuilder(name.getTableName(), null).
+        setOperationTimeout(3 * 1000).build()) {
+      Append append = new Append(ROW);
+      append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+
+      // Batch append
+      Object[] results = new Object[1];
+      table.batch(Collections.singletonList(append), results);
+
+      // Verify expected result
+      Cell[] cells = ((Result) results[0]).rawCells();
+      assertEquals(1, cells.length);
+      assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+
+      // Verify expected result again
+      Result readResult = table.get(new Get(ROW));
+      cells = readResult.rawCells();
+      assertEquals(1, cells.length);
+      assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+    }
+  }
+
+  /**
    * Basic client side validation of HBASE-4536
    */
   @Test public void testKeepDeletedCells() throws Exception {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
index 8395aa2..58c88b5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +37,6 @@ import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -101,11 +101,16 @@ public class TestIncrementsFromClientSide {
    */
   @Test
   public void testDuplicateIncrement() throws Exception {
-    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()));
+    TableDescriptorBuilder.ModifyableTableDescriptor mtd =
+      TEST_UTIL.createModifyableTableDescriptor(name.getMethodName());
     Map<String, String> kvs = new HashMap<>();
     kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
-    hdt.addCoprocessor(SleepAtFirstRpcCall.class.getName(), null, 1, kvs);
-    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+    mtd.setCoprocessor(CoprocessorDescriptorBuilder
+      .newBuilder(SleepAtFirstRpcCall.class.getName())
+      .setPriority(1)
+      .setProperties(kvs)
+      .build());
+    TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
 
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
     c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
@@ -131,6 +136,49 @@ public class TestIncrementsFromClientSide {
     }
   }
 
+  /**
+   * Test batch increment result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateBatchIncrement() throws Exception {
+    TableDescriptorBuilder.ModifyableTableDescriptor mtd =
+      TEST_UTIL.createModifyableTableDescriptor(name.getMethodName());
+    Map<String, String> kvs = new HashMap<>();
+    kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
+    mtd.setCoprocessor(CoprocessorDescriptorBuilder
+      .newBuilder(SleepAtFirstRpcCall.class.getName())
+      .setPriority(1)
+      .setProperties(kvs)
+      .build());
+    TEST_UTIL.createTable(mtd, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    try (Connection connection = ConnectionFactory.createConnection(c);
+      Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null)
+        .setOperationTimeout(3 * 1000).build()) {
+      Increment inc = new Increment(ROW);
+      inc.addColumn(HBaseTestingUtility.fam1, QUALIFIER, 1);
+
+      // Batch increment
+      Object[] results = new Object[1];
+      table.batch(Collections.singletonList(inc), results);
+
+      Cell[] cells = ((Result) results[0]).rawCells();
+      assertEquals(1, cells.length);
+      assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
+
+      // Verify expected result
+      Result readResult = table.get(new Get(ROW));
+      cells = readResult.rawCells();
+      assertEquals(1, cells.length);
+      assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
+    }
+  }
+
   @Test
   public void testIncrementWithDeletes() throws Exception {
     LOG.info("Starting " + this.name.getMethodName());