You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/09 00:13:59 UTC

[19/32] hbase git commit: HBASE-15214 Valid mutate Ops fail with RPC Codec in use and region moves across.

HBASE-15214 Valid mutate Ops fail with RPC Codec in use and region moves across.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7239056c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7239056c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7239056c

Branch: refs/heads/hbase-12439
Commit: 7239056c78cc6eb2867c8865ab45821d3e51328a
Parents: 4265bf2
Author: anoopsjohn <an...@gmail.com>
Authored: Sat Feb 6 02:40:49 2016 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Sat Feb 6 02:40:49 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     | 18 ++++-------
 .../hbase/regionserver/RSRpcServices.java       | 34 ++++++++++++++++++++
 .../hadoop/hbase/client/TestMultiParallel.java  |  4 +++
 3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7239056c/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index fe76780..e9a1223 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -543,7 +543,7 @@ public final class ProtobufUtil {
     MutationType type = proto.getMutateType();
     assert type == MutationType.PUT: type.name();
     long timestamp = proto.hasTimestamp()? proto.getTimestamp(): HConstants.LATEST_TIMESTAMP;
-    Put put = null;
+    Put put = proto.hasRow() ? new Put(proto.getRow().toByteArray(), timestamp) : null;
     int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
     if (cellCount > 0) {
       // The proto has metadata only and the data is separate to be found in the cellScanner.
@@ -563,9 +563,7 @@ public final class ProtobufUtil {
         put.add(cell);
       }
     } else {
-      if (proto.hasRow()) {
-        put = new Put(proto.getRow().asReadOnlyByteBuffer(), timestamp);
-      } else {
+      if (put == null) {
         throw new IllegalArgumentException("row cannot be null");
       }
       // The proto has the metadata and the data itself
@@ -639,12 +637,8 @@ public final class ProtobufUtil {
   throws IOException {
     MutationType type = proto.getMutateType();
     assert type == MutationType.DELETE : type.name();
-    byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
-    long timestamp = HConstants.LATEST_TIMESTAMP;
-    if (proto.hasTimestamp()) {
-      timestamp = proto.getTimestamp();
-    }
-    Delete delete = null;
+    long timestamp = proto.hasTimestamp() ? proto.getTimestamp() : HConstants.LATEST_TIMESTAMP;
+    Delete delete = proto.hasRow() ? new Delete(proto.getRow().toByteArray(), timestamp) : null;
     int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
     if (cellCount > 0) {
       // The proto has metadata only and the data is separate to be found in the cellScanner.
@@ -667,7 +661,9 @@ public final class ProtobufUtil {
         delete.addDeleteMarker(cell);
       }
     } else {
-      delete = new Delete(row, timestamp);
+      if (delete == null) {
+        throw new IllegalArgumentException("row cannot be null");
+      }
       for (ColumnValue column: proto.getColumnValueList()) {
         byte[] family = column.getFamily().toByteArray();
         for (QualifierValue qv: column.getQualifierValueList()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7239056c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 3e133c4..e346c34 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -129,6 +129,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
@@ -696,6 +697,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
               setException(ResponseConverter.buildException(sizeIOE));
           resultOrExceptionBuilder.setIndex(action.getIndex());
           builder.addResultOrException(resultOrExceptionBuilder.build());
+          if (cellScanner != null) {
+            skipCellsForMutation(action, cellScanner);
+          }
           continue;
         }
         if (action.hasGet()) {
@@ -2239,6 +2243,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         rpcServer.getMetrics().exception(e);
         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+        // All Mutations in this RegionAction not executed as we can not see the Region online here
+        // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
+        // corresponding to these Mutations.
+        if (cellScanner != null) {
+          skipCellsForMutations(regionAction.getActionList(), cellScanner);
+        }
         continue;  // For this region it's a failure.
       }
 
@@ -2296,6 +2306,30 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return responseBuilder.build();
   }
 
+  private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
+    for (Action action : actions) {
+      skipCellsForMutation(action, cellScanner);
+    }
+  }
+
+  private void skipCellsForMutation(Action action, CellScanner cellScanner) {
+    try {
+      if (action.hasMutation()) {
+        MutationProto m = action.getMutation();
+        if (m.hasAssociatedCellCount()) {
+          for (int i = 0; i < m.getAssociatedCellCount(); i++) {
+            cellScanner.advance();
+          }
+        }
+      }
+    } catch (IOException e) {
+      // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
+      // marked as failed as we could not see the Region here. At client side the top level
+      // RegionAction exception will be considered first.
+      LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
+    }
+  }
+
   /**
    * Mutate data in a table.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/7239056c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index af3a54e..d295ab2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -35,10 +35,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -75,6 +77,8 @@ public class TestMultiParallel {
     //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
     //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
     //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
+    UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+        KeyValueCodec.class.getCanonicalName());
     UTIL.startMiniCluster(slaves);
     Table t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
     UTIL.waitTableEnabled(TEST_TABLE);