You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2016/09/13 06:25:56 UTC
[2/4] hbase git commit: HBASE-16592 Unify Delete request with AP
HBASE-16592 Unify Delete request with AP
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2566cfeb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2566cfeb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2566cfeb
Branch: refs/heads/master
Commit: 2566cfeb60de644f287ac192d360f3fc15376c8f
Parents: c57acf2
Author: chenheng <ch...@apache.org>
Authored: Tue Sep 13 10:07:45 2016 +0800
Committer: Gary Helmling <ga...@apache.org>
Committed: Mon Sep 12 23:23:38 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/AbstractResponse.java | 38 ++++++++++++
.../hadoop/hbase/client/AsyncProcess.java | 22 ++++---
.../org/apache/hadoop/hbase/client/HTable.java | 43 ++++++++-----
.../hadoop/hbase/client/MultiResponse.java | 7 ++-
.../hadoop/hbase/client/SingleResponse.java | 65 ++++++++++++++++++++
.../hbase/protobuf/ResponseConverter.java | 14 +++++
.../hadoop/hbase/client/TestAsyncProcess.java | 18 +++---
.../hadoop/hbase/client/TestFromClientSide.java | 46 ++++++++++++++
8 files changed, 222 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
new file mode 100644
index 0000000..7878d05
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractResponse.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This class is used to extend AP to process single action request, like delete, get etc.
+ */
+@InterfaceAudience.Private
+abstract class AbstractResponse {
+
+ public enum ResponseType {
+
+ SINGLE (0),
+ MULTI (1);
+
+ ResponseType(int value) {}
+ }
+
+ public abstract ResponseType type();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index c5745e9..1531201 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -756,14 +756,14 @@ class AsyncProcess {
@Override
public void run() {
- MultiResponse res;
+ AbstractResponse res;
CancellableRegionServerCallable callable = currentCallable;
try {
// setup the callable based on the actions, if we don't have one already from the request
if (callable == null) {
callable = createCallable(server, tableName, multiAction);
}
- RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
+ RpcRetryingCaller<AbstractResponse> caller = createCaller(callable);
try {
if (callsInProgress != null) {
callsInProgress.add(callable);
@@ -785,9 +785,16 @@ class AsyncProcess {
receiveGlobalFailure(multiAction, server, numAttempt, t);
return;
}
-
- // Normal case: we received an answer from the server, and it's not an exception.
- receiveMultiAction(multiAction, server, res, numAttempt);
+ if (res.type() == AbstractResponse.ResponseType.MULTI) {
+ // Normal case: we received an answer from the server, and it's not an exception.
+ receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
+ } else {
+ if (results != null) {
+ SingleResponse singleResponse = (SingleResponse) res;
+ results[0] = singleResponse.getEntry();
+ }
+ decActionCounter(1);
+ }
} catch (Throwable t) {
// Something really bad happened. We are on the send thread that will now die.
LOG.error("Internal AsyncProcess #" + id + " error for "
@@ -1782,8 +1789,9 @@ class AsyncProcess {
* Create a caller. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
- protected RpcRetryingCaller<MultiResponse> createCaller(CancellableRegionServerCallable callable) {
- return rpcCallerFactory.<MultiResponse> newCaller();
+ protected RpcRetryingCaller<AbstractResponse> createCaller(
+ CancellableRegionServerCallable callable) {
+ return rpcCallerFactory.<AbstractResponse> newCaller();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 0d1b156..bcbb1da 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -524,18 +523,25 @@ public class HTable implements Table {
@Override
public void delete(final Delete delete)
throws IOException {
- RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
- this.rpcControllerFactory, getName(), delete.getRow()) {
+ CancellableRegionServerCallable<SingleResponse> callable =
+ new CancellableRegionServerCallable<SingleResponse>(
+ connection, getName(), delete.getRow(), this.rpcControllerFactory) {
@Override
- protected Boolean rpcCall() throws Exception {
+ protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = getStub().mutate(getRpcController(), request);
- return Boolean.valueOf(response.getProcessed());
+ return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
- rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ List<Row> rows = new ArrayList<Row>();
+ rows.add(delete);
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
+ null, null, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
}
/**
@@ -768,21 +774,30 @@ public class HTable implements Table {
final byte [] qualifier, final CompareOp compareOp, final byte [] value,
final Delete delete)
throws IOException {
- RegionServerCallable<Boolean> callable =
- new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
- getName(), row) {
+ CancellableRegionServerCallable<SingleResponse> callable =
+ new CancellableRegionServerCallable<SingleResponse>(
+ this.connection, getName(), row, this.rpcControllerFactory) {
@Override
- protected Boolean rpcCall() throws Exception {
+ protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
new BinaryComparator(value), compareType, delete);
MutateResponse response = getStub().mutate(getRpcController(), request);
- return Boolean.valueOf(response.getProcessed());
+ return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
- return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable,
- this.operationTimeout);
+ List<Row> rows = new ArrayList<Row>();
+ rows.add(delete);
+
+ Object[] results = new Object[1];
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
+ null, results, callable, operationTimeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
+ return ((SingleResponse.Entry)results[0]).isProcessed();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
index 79a9ed3..18376f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* A container for Result objects, grouped by regionName.
*/
@InterfaceAudience.Private
-public class MultiResponse {
+public class MultiResponse extends AbstractResponse {
// map of regionName to map of Results by the original index for that Result
private Map<byte[], RegionResult> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -101,6 +101,11 @@ public class MultiResponse {
return this.results;
}
+ @Override
+ public ResponseType type() {
+ return ResponseType.MULTI;
+ }
+
static class RegionResult{
Map<Integer, Object> result = new HashMap<>();
ClientProtos.RegionLoadStats stat;
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
new file mode 100644
index 0000000..68897b5
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SingleResponse.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Class for single action response
+ */
+@InterfaceAudience.Private
+public class SingleResponse extends AbstractResponse {
+ private Entry entry = null;
+
+ @InterfaceAudience.Private
+ public static class Entry {
+ private Result result = null;
+ private boolean processed = false;
+
+ public Result getResult() {
+ return result;
+ }
+
+ public void setResult(Result result) {
+ this.result = result;
+ }
+
+ public boolean isProcessed() {
+ return processed;
+ }
+
+ public void setProcessed(boolean processed) {
+ this.processed = processed;
+ }
+
+ }
+
+ public Entry getEntry() {
+ return entry;
+ }
+
+ public void setEntry(Entry entry) {
+ this.entry = entry;
+ }
+ @Override
+ public ResponseType type() {
+ return ResponseType.SINGLE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index 76b4ccf..e5deabd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.SingleResponse;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
@@ -149,6 +150,19 @@ public final class ResponseConverter {
return results;
}
+
+ public static SingleResponse getResult(final ClientProtos.MutateRequest request,
+ final ClientProtos.MutateResponse response,
+ final CellScanner cells)
+ throws IOException {
+ SingleResponse singleResponse = new SingleResponse();
+ SingleResponse.Entry entry = new SingleResponse.Entry();
+ entry.setResult(ProtobufUtil.toResult(response.getResult(), cells));
+ entry.setProcessed(response.getProcessed());
+ singleResponse.setEntry(entry);
+ return singleResponse;
+ }
+
/**
* Wrap a throwable to an action result.
*
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index e7366a9..54552d9 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -218,7 +218,7 @@ public class TestAsyncProcess {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
@Override
- protected RpcRetryingCaller<MultiResponse> createCaller(
+ protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
@@ -234,9 +234,9 @@ public class TestAsyncProcess {
}
});
- return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
+ return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) {
@Override
- public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+ public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout)
throws IOException, RuntimeException {
try {
@@ -252,7 +252,7 @@ public class TestAsyncProcess {
}
}
- static class CallerWithFailure extends RpcRetryingCallerImpl<MultiResponse>{
+ static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
private final IOException e;
@@ -262,7 +262,7 @@ public class TestAsyncProcess {
}
@Override
- public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+ public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout)
throws IOException, RuntimeException {
throw e;
@@ -281,7 +281,7 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller<MultiResponse> createCaller(
+ protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
@@ -332,7 +332,7 @@ public class TestAsyncProcess {
}
@Override
- protected RpcRetryingCaller<MultiResponse> createCaller(
+ protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable payloadCallable) {
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
@@ -362,9 +362,9 @@ public class TestAsyncProcess {
replicaCalls.incrementAndGet();
}
- return new RpcRetryingCallerImpl<MultiResponse>(100, 10, 9) {
+ return new RpcRetryingCallerImpl<AbstractResponse>(100, 10, 9) {
@Override
- public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
+ public MultiResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
int callTimeout)
throws IOException, RuntimeException {
long sleep = -1;
http://git-wip-us.apache.org/repos/asf/hbase/blob/2566cfeb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index bc94b02..f465625 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
@@ -1858,6 +1859,33 @@ public class TestFromClientSide {
}
@Test
+ public void testDeleteWithFailed() throws Exception {
+ TableName TABLE = TableName.valueOf("testDeleteWithFailed");
+
+ byte [][] ROWS = makeNAscii(ROW, 6);
+ byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+ byte [][] VALUES = makeN(VALUE, 5);
+ long [] ts = {1000, 2000, 3000, 4000, 5000};
+
+ Table ht = TEST_UTIL.createTable(TABLE, FAMILIES, 3);
+
+ Put put = new Put(ROW);
+ put.addColumn(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
+ ht.put(put);
+
+ // delete wrong family
+ Delete delete = new Delete(ROW);
+ delete.addFamily(FAMILIES[1], ts[0]);
+ ht.delete(delete);
+
+ Get get = new Get(ROW);
+ get.addFamily(FAMILIES[0]);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ Result result = ht.get(get);
+ assertTrue(Bytes.equals(result.getValue(FAMILIES[0], QUALIFIER), VALUES[0]));
+ }
+
+ @Test
public void testDeletes() throws Exception {
TableName TABLE = TableName.valueOf("testDeletes");
@@ -4623,6 +4651,24 @@ public class TestFromClientSide {
}
@Test
+ public void testCheckAndDelete() throws IOException {
+ final byte [] value1 = Bytes.toBytes("aaaa");
+
+ Table table = TEST_UTIL.createTable(TableName.valueOf("testCheckAndDelete"),
+ FAMILY);
+
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, value1);
+ table.put(put);
+
+ Delete delete = new Delete(ROW);
+ delete.addColumns(FAMILY, QUALIFIER);
+
+ boolean ok = table.checkAndDelete(ROW, FAMILY, QUALIFIER, value1, delete);
+ assertEquals(ok, true);
+ }
+
+ @Test
public void testCheckAndDeleteWithCompareOp() throws IOException {
final byte [] value1 = Bytes.toBytes("aaaa");
final byte [] value2 = Bytes.toBytes("bbbb");