You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2020/06/08 01:57:33 UTC

[hbase] branch branch-2.3 updated: HBASE-24515 batch Increment/Append fails when retrying the RPC

This is an automated email from the ASF dual-hosted git repository.

brfrn169 pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new a30d3e1  HBASE-24515 batch Increment/Append fails when retrying the RPC
a30d3e1 is described below

commit a30d3e1a55fe866a1d81c782a2eff0490225d079
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Mon Jun 8 09:51:21 2020 +0900

    HBASE-24515 batch Increment/Append fails when retrying the RPC
    
    Signed-off-by: Viraj Jasani <vi...@gmail.com>
---
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 54 ----------------------
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 31 +++++++++++--
 .../hadoop/hbase/client/TestFromClientSide.java    | 41 ++++++++++++++++
 .../hbase/client/TestIncrementsFromClientSide.java | 41 ++++++++++++++++
 4 files changed, 109 insertions(+), 58 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 26666c6..09db446 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -925,60 +925,6 @@ public final class ProtobufUtil {
     throw new IOException("Unknown mutation type " + type);
   }
 
-  /**
-   * Convert a protocol buffer Mutate to a Get.
-   * @param proto the protocol buffer Mutate to convert.
-   * @param cellScanner
-   * @return the converted client get.
-   * @throws IOException
-   */
-  public static Get toGet(final MutationProto proto, final CellScanner cellScanner)
-      throws IOException {
-    MutationType type = proto.getMutateType();
-    assert type == MutationType.INCREMENT || type == MutationType.APPEND : type.name();
-    byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
-    Get get = null;
-    int cellCount = proto.hasAssociatedCellCount() ? proto.getAssociatedCellCount() : 0;
-    if (cellCount > 0) {
-      // The proto has metadata only and the data is separate to be found in the cellScanner.
-      if (cellScanner == null) {
-        throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: "
-            + TextFormat.shortDebugString(proto));
-      }
-      for (int i = 0; i < cellCount; i++) {
-        if (!cellScanner.advance()) {
-          throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i
-              + " no cell returned: " + TextFormat.shortDebugString(proto));
-        }
-        Cell cell = cellScanner.current();
-        if (get == null) {
-          get = new Get(CellUtil.cloneRow(cell));
-        }
-        get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
-      }
-    } else {
-      get = new Get(row);
-      for (ColumnValue column : proto.getColumnValueList()) {
-        byte[] family = column.getFamily().toByteArray();
-        for (QualifierValue qv : column.getQualifierValueList()) {
-          byte[] qualifier = qv.getQualifier().toByteArray();
-          if (!qv.hasValue()) {
-            throw new DoNotRetryIOException("Missing required field: qualifier value");
-          }
-          get.addColumn(family, qualifier);
-        }
-      }
-    }
-    if (proto.hasTimeRange()) {
-      TimeRange timeRange = toTimeRange(proto.getTimeRange());
-      get.setTimeRange(timeRange.getMin(), timeRange.getMax());
-    }
-    for (NameBytesPair attribute : proto.getAttributeList()) {
-      get.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
-    }
-    return get;
-  }
-
   public static ClientProtos.Scan.ReadType toReadType(Scan.ReadType readType) {
     switch (readType) {
       case DEFAULT:
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 8f57a37..8cab3fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -720,8 +720,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           r = region.append(append, nonceGroup, nonce);
         } else {
           // convert duplicate append to get
-          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false,
-              nonceGroup, nonce);
+          List<Cell> results = region.get(toGet(append), false, nonceGroup, nonce);
           r = Result.create(results);
         }
         success = true;
@@ -767,8 +766,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           r = region.increment(increment, nonceGroup, nonce);
         } else {
           // convert duplicate increment to get
-          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup,
-              nonce);
+          List<Cell> results = region.get(toGet(increment), false, nonceGroup, nonce);
           r = Result.create(results);
         }
         success = true;
@@ -790,6 +788,31 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return r == null ? Result.EMPTY_RESULT : r;
   }
 
+  private static Get toGet(final Mutation mutation) throws IOException {
+    if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
+      throw new AssertionError("mutation must be a instance of Increment or Append");
+    }
+    Get get = new Get(mutation.getRow());
+    CellScanner cellScanner = mutation.cellScanner();
+    while (!cellScanner.advance()) {
+      Cell cell = cellScanner.current();
+      get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
+    }
+    if (mutation instanceof Increment) {
+      // Increment
+      Increment increment = (Increment) mutation;
+      get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
+    } else {
+      // Append
+      Append append = (Append) mutation;
+      get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
+    }
+    for (Entry<String, byte[]> entry : mutation.getAttributesMap().entrySet()) {
+      get.setAttribute(entry.getKey(), entry.getValue());
+    }
+    return get;
+  }
+
   /**
    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index ac02e9b..0c9654d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -147,6 +148,46 @@ public class TestFromClientSide extends FromClientSideBase {
   }
 
   /**
+   * Test batch append result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateBatchAppend() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getTableName());
+    Map<String, String> kvs = new HashMap<>();
+    kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
+    hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1,
+      kvs);
+    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry because rpc timeout is small than the sleep time of first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    try (Connection connection = ConnectionFactory.createConnection(c);
+      Table table = connection.getTableBuilder(name.getTableName(), null).
+        setOperationTimeout(3 * 1000).build()) {
+      Append append = new Append(ROW);
+      append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+
+      // Batch append
+      Object[] results = new Object[1];
+      table.batch(Collections.singletonList(append), results);
+
+      // Verify expected result
+      Cell[] cells = ((Result) results[0]).rawCells();
+      assertEquals(1, cells.length);
+      assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+
+      // Verify expected result again
+      Result readResult = table.get(new Get(ROW));
+      cells = readResult.rawCells();
+      assertEquals(1, cells.length);
+      assertKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, VALUE);
+    }
+  }
+
+  /**
    * Basic client side validation of HBASE-4536
    */
   @Test public void testKeepDeletedCells() throws Exception {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
index 90c7e27..6c230df 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -138,6 +139,46 @@ public class TestIncrementsFromClientSide {
     }
   }
 
+  /**
+   * Test batch increment result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateBatchIncrement() throws Exception {
+    HTableDescriptor hdt =
+      TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()));
+    Map<String, String> kvs = new HashMap<>();
+    kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000");
+    hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(), null, 1,
+      kvs);
+    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    try (Connection connection = ConnectionFactory.createConnection(c);
+      Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null)
+        .setOperationTimeout(3 * 1000).build()) {
+      Increment inc = new Increment(ROW);
+      inc.addColumn(HBaseTestingUtility.fam1, QUALIFIER, 1);
+
+      // Batch increment
+      Object[] results = new Object[1];
+      table.batch(Collections.singletonList(inc), results);
+
+      Cell[] cells = ((Result) results[0]).rawCells();
+      assertEquals(1, cells.length);
+      assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
+
+      // Verify expected result
+      Result readResult = table.get(new Get(ROW));
+      cells = readResult.rawCells();
+      assertEquals(1, cells.length);
+      assertIncrementKey(cells[0], ROW, HBaseTestingUtility.fam1, QUALIFIER, 1);
+    }
+  }
+
   @Test
   public void testIncrementWithDeletes() throws Exception {
     LOG.info("Starting " + this.name.getMethodName());