You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/09/19 23:17:29 UTC

[16/50] [abbrv] hbase git commit: HBASE-16592 Unify Delete request with AP

HBASE-16592 Unify Delete request with AP


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/831fb3cc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/831fb3cc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/831fb3cc

Branch: refs/heads/hbase-12439
Commit: 831fb3ccb8a0ba449d249962379afd268e8fe032
Parents: 1cdc5ac
Author: chenheng <ch...@apache.org>
Authored: Tue Sep 13 10:07:45 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Tue Sep 13 10:07:45 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AbstractResponse.java   | 38 ++++++++++++
 .../hadoop/hbase/client/AsyncProcess.java       | 22 ++++---
 .../org/apache/hadoop/hbase/client/HTable.java  | 43 ++++++++-----
 .../hadoop/hbase/client/MultiResponse.java      |  7 ++-
 .../hadoop/hbase/client/SingleResponse.java     | 65 ++++++++++++++++++++
 .../hbase/protobuf/ResponseConverter.java       | 14 +++++
 .../hadoop/hbase/client/TestAsyncProcess.java   | 18 +++---
 .../hadoop/hbase/client/TestFromClientSide.java | 46 ++++++++++++++
 8 files changed, 222 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
new file mode 100644
index 0000000..7878d05
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This class is used to extend AP to process single action request, like delete, get etc.
+ */
+@InterfaceAudience.Private
+abstract class AbstractResponse {
+
+  public enum ResponseType {
+
+    SINGLE    (0),
+    MULTI       (1);
+
+    ResponseType(int value) {}
+  }
+
+  public abstract ResponseType type();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index c5745e9..1531201 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -756,14 +756,14 @@ class AsyncProcess {
 
       @Override
       public void run() {
-        MultiResponse res;
+        AbstractResponse res;
         CancellableRegionServerCallable callable = currentCallable;
         try {
           // setup the callable based on the actions, if we don't have one already from the request
           if (callable == null) {
             callable = createCallable(server, tableName, multiAction);
           }
-          RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
+          RpcRetryingCaller<AbstractResponse> caller = createCaller(callable);
           try {
             if (callsInProgress != null) {
               callsInProgress.add(callable);
@@ -785,9 +785,16 @@ class AsyncProcess {
             receiveGlobalFailure(multiAction, server, numAttempt, t);
             return;
           }
-
-          // Normal case: we received an answer from the server, and it's not an exception.
-          receiveMultiAction(multiAction, server, res, numAttempt);
+          if (res.type() == AbstractResponse.ResponseType.MULTI) {
+            // Normal case: we received an answer from the server, and it's not an exception.
+            receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
+          } else {
+            if (results != null) {
+              SingleResponse singleResponse = (SingleResponse) res;
+              results[0] = singleResponse.getEntry();
+            }
+            decActionCounter(1);
+          }
         } catch (Throwable t) {
           // Something really bad happened. We are on the send thread that will now die.
           LOG.error("Internal AsyncProcess #" + id + " error for "
@@ -1782,8 +1789,9 @@ class AsyncProcess {
    * Create a caller. Isolated to be easily overridden in the tests.
    */
   @VisibleForTesting
-  protected RpcRetryingCaller<MultiResponse> createCaller(CancellableRegionServerCallable callable) {
-    return rpcCallerFactory.<MultiResponse> newCaller();
+  protected RpcRetryingCaller<AbstractResponse> createCaller(
+      CancellableRegionServerCallable callable) {
+    return rpcCallerFactory.<AbstractResponse> newCaller();
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 0d1b156..bcbb1da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -524,18 +523,25 @@ public class HTable implements Table {
   @Override
   public void delete(final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
-        this.rpcControllerFactory, getName(), delete.getRow()) {
+    CancellableRegionServerCallable<SingleResponse> callable =
+        new CancellableRegionServerCallable<SingleResponse>(
+            connection, getName(), delete.getRow(), this.rpcControllerFactory) {
       @Override
-      protected Boolean rpcCall() throws Exception {
+      protected SingleResponse rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
           getLocation().getRegionInfo().getRegionName(), delete);
         MutateResponse response = getStub().mutate(getRpcController(), request);
-        return Boolean.valueOf(response.getProcessed());
+        return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
       }
     };
-    rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    List<Row> rows = new ArrayList<Row>();
+    rows.add(delete);
+    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
+        null, null, callable, operationTimeout);
+    ars.waitUntilDone();
+    if (ars.hasError()) {
+      throw ars.getErrors();
+    }
   }
 
   /**
@@ -768,21 +774,30 @@ public class HTable implements Table {
       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
       final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
-            getName(), row) {
+    CancellableRegionServerCallable<SingleResponse> callable =
+        new CancellableRegionServerCallable<SingleResponse>(
+            this.connection, getName(), row, this.rpcControllerFactory) {
       @Override
-      protected Boolean rpcCall() throws Exception {
+      protected SingleResponse rpcCall() throws Exception {
         CompareType compareType = CompareType.valueOf(compareOp.name());
         MutateRequest request = RequestConverter.buildMutateRequest(
           getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
           new BinaryComparator(value), compareType, delete);
         MutateResponse response = getStub().mutate(getRpcController(), request);
-        return Boolean.valueOf(response.getProcessed());
+        return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
       }
     };
-    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    List<Row> rows = new ArrayList<Row>();
+    rows.add(delete);
+
+    Object[] results = new Object[1];
+    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
+        null, results, callable, operationTimeout);
+    ars.waitUntilDone();
+    if (ars.hasError()) {
+      throw ars.getErrors();
+    }
+    return ((SingleResponse.Entry)results[0]).isProcessed();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
index 79a9ed3..18376f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  * A container for Result objects, grouped by regionName.
  */
 @InterfaceAudience.Private
-public class MultiResponse {
+public class MultiResponse extends AbstractResponse {
 
   // map of regionName to map of Results by the original index for that Result
   private Map<byte[], RegionResult> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -101,6 +101,11 @@ public class MultiResponse {
     return this.results;
   }
 
+  @Override
+  public ResponseType type() {
+    return ResponseType.MULTI;
+  }
+
   static class RegionResult{
     Map<Integer, Object> result = new HashMap<>();
     ClientProtos.RegionLoadStats stat;

http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
new file mode 100644
index 0000000..68897b5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Class for single action response
+ */
+@InterfaceAudience.Private
+public class SingleResponse extends AbstractResponse {
+  private Entry entry = null;
+
+  @InterfaceAudience.Private
+  public static class Entry {
+    private Result result = null;
+    private boolean processed = false;
+
+    public Result getResult() {
+      return result;
+    }
+
+    public void setResult(Result result) {
+      this.result = result;
+    }
+
+    public boolean isProcessed() {
+      return processed;
+    }
+
+    public void setProcessed(boolean processed) {
+      this.processed = processed;
+    }
+
+  }
+
+  public Entry getEntry() {
+    return entry;
+  }
+
+  public void setEntry(Entry entry) {
+    this.entry = entry;
+  }
+  @Override
+  public ResponseType type() {
+    return ResponseType.SINGLE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 76b4ccf..e5deabd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.SingleResponse;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
@@ -149,6 +150,19 @@ public final class ResponseConverter {
     return results;
   }
 
+
+  public static SingleResponse getResult(final ClientProtos.MutateRequest request,
+                                         final ClientProtos.MutateResponse response,
+                                         final CellScanner cells)
+      throws IOException {
+    SingleResponse singleResponse = new SingleResponse();
+    SingleResponse.Entry entry = new SingleResponse.Entry();
+    entry.setResult(ProtobufUtil.toResult(response.getResult(), cells));
+    entry.setProcessed(response.getProcessed());
+    singleResponse.setEntry(entry);
+    return singleResponse;
+  }
+
   /**
    * Wrap a throwable to an action result.
    *

http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index e7366a9..54552d9 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -218,7 +218,7 @@ public class TestAsyncProcess {
       // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
     }
     @Override
-    protected RpcRetryingCaller<MultiResponse> createCaller(
+    protected RpcRetryingCaller<AbstractResponse> createCaller(
         CancellableRegionServerCallable callable) {
       callsCt.incrementAndGet();
       MultiServerCallable callable1 = (MultiServerCallable) callable;
@@ -234,9 +234,9 @@ public class TestAsyncProcess {
             }
           });
 
-      return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
+      return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) {
         @Override
-        public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+        public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
                                                 int callTimeout)
         throws IOException, RuntimeException {
           try {
@@ -252,7 +252,7 @@ public class TestAsyncProcess {
     }
   }
 
-  static class CallerWithFailure extends RpcRetryingCallerImpl<MultiResponse>{
+  static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
 
     private final IOException e;
 
@@ -262,7 +262,7 @@ public class TestAsyncProcess {
     }
 
     @Override
-    public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+    public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
                                             int callTimeout)
         throws IOException, RuntimeException {
       throw e;
@@ -281,7 +281,7 @@ public class TestAsyncProcess {
     }
 
     @Override
-    protected RpcRetryingCaller<MultiResponse> createCaller(
+    protected RpcRetryingCaller<AbstractResponse> createCaller(
       CancellableRegionServerCallable callable) {
       callsCt.incrementAndGet();
       return new CallerWithFailure(ioe);
@@ -332,7 +332,7 @@ public class TestAsyncProcess {
     }
 
     @Override
-    protected RpcRetryingCaller<MultiResponse> createCaller(
+    protected RpcRetryingCaller<AbstractResponse> createCaller(
         CancellableRegionServerCallable payloadCallable) {
       MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
       final MultiResponse mr = createMultiResponse(
@@ -362,9 +362,9 @@ public class TestAsyncProcess {
         replicaCalls.incrementAndGet();
       }
 
-      return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
+      return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) {
         @Override
-        public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+        public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
                                                 int callTimeout)
         throws IOException, RuntimeException {
           long sleep = -1;

http://git-wip-us.apache.org/repos/asf/hbase/blob/831fb3cc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index bc94b02..f465625 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
@@ -1858,6 +1859,33 @@ public class TestFromClientSide {
   }
 
   @Test
+  public void testDeleteWithFailed() throws Exception {
+    TableName TABLE = TableName.valueOf("testDeleteWithFailed");
+
+    byte [][] ROWS = makeNAscii(ROW, 6);
+    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+    byte [][] VALUES = makeN(VALUE, 5);
+    long [] ts = {1000, 2000, 3000, 4000, 5000};
+
+    Table ht = TEST_UTIL.createTable(TABLE, FAMILIES, 3);
+
+    Put put = new Put(ROW);
+    put.addColumn(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
+    ht.put(put);
+
+    // delete wrong family
+    Delete delete = new Delete(ROW);
+    delete.addFamily(FAMILIES[1], ts[0]);
+    ht.delete(delete);
+
+    Get get = new Get(ROW);
+    get.addFamily(FAMILIES[0]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    Result result = ht.get(get);
+    assertTrue(Bytes.equals(result.getValue(FAMILIES[0], QUALIFIER), VALUES[0]));
+  }
+
+  @Test
   public void testDeletes() throws Exception {
     TableName TABLE = TableName.valueOf("testDeletes");
 
@@ -4623,6 +4651,24 @@ public class TestFromClientSide {
   }
 
   @Test
+  public void testCheckAndDelete() throws IOException {
+    final byte [] value1 = Bytes.toBytes("aaaa");
+
+    Table table = TEST_UTIL.createTable(TableName.valueOf("testCheckAndDelete"),
+        FAMILY);
+
+    Put put = new Put(ROW);
+    put.addColumn(FAMILY, QUALIFIER, value1);
+    table.put(put);
+
+    Delete delete = new Delete(ROW);
+    delete.addColumns(FAMILY, QUALIFIER);
+
+    boolean ok = table.checkAndDelete(ROW, FAMILY, QUALIFIER, value1, delete);
+    assertEquals(ok, true);
+  }
+
+  @Test
   public void testCheckAndDeleteWithCompareOp() throws IOException {
     final byte [] value1 = Bytes.toBytes("aaaa");
     final byte [] value2 = Bytes.toBytes("bbbb");