You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:48 UTC
[46/50] [abbrv] hbase git commit: HBASE-17346 AggregationClient
cleanup
HBASE-17346 AggregationClient cleanup
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0a93241b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0a93241b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0a93241b
Branch: refs/heads/hbase-12439
Commit: 0a93241b61e6183b5671a4e7940e6212a17acd66
Parents: 521730e
Author: Michael Stack <st...@apache.org>
Authored: Sun Jan 1 16:01:10 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Sun Jan 1 16:01:10 2017 -0800
----------------------------------------------------------------------
.../client/coprocessor/AggregationClient.java | 94 +++++++++++++++-----
1 file changed, 71 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0a93241b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
index cde7d41..d236342 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
@@ -59,6 +58,8 @@ import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
/**
* This client class is for invoking the aggregate functions deployed on the
@@ -81,13 +82,60 @@ import com.google.protobuf.Message;
* </ul>
* <p>Call {@link #close()} when done.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.Public
public class AggregationClient implements Closeable {
// TODO: This class is not used. Move to examples?
private static final Log log = LogFactory.getLog(AggregationClient.class);
private final Connection connection;
/**
+ * An RpcController implementation for use here in this endpoint.
+ */
+ static class AggregationClientRpcController implements RpcController {
+ private String errorText;
+ private boolean cancelled = false;
+ private boolean failed = false;
+
+ @Override
+ public String errorText() {
+ return this.errorText;
+ }
+
+ @Override
+ public boolean failed() {
+ return this.failed;
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return this.cancelled;
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ this.errorText = null;
+ this.cancelled = false;
+ this.failed = false;
+ }
+
+ @Override
+ public void setFailed(String errorText) {
+ this.failed = true;
+ this.errorText = errorText;
+ }
+
+ @Override
+ public void startCancel() {
+ this.cancelled = true;
+ }
+ }
+
+ /**
* Constructor with Conf object
* @param cfg
*/
@@ -160,13 +208,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, R>() {
@Override
public R call(AggregateService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getMax(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
+ if (controller.failed()) {
+ throw new IOException(controller.errorText());
}
if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0);
@@ -248,13 +296,13 @@ public class AggregationClient implements Closeable {
@Override
public R call(AggregateService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getMin(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
+ if (controller.failed()) {
+ throw new IOException(controller.errorText());
}
if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0);
@@ -323,13 +371,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, Long>() {
@Override
public Long call(AggregateService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getRowNum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
+ if (controller.failed()) {
+ throw new IOException(controller.errorText());
}
byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
@@ -388,14 +436,14 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, S>() {
@Override
public S call(AggregateService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = new AggregationClientRpcController();
// Not sure what is going on here why I have to do these casts. TODO.
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getSum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
+ if (controller.failed()) {
+ throw new IOException(controller.errorText());
}
if (response.getFirstPartCount() == 0) {
return null;
@@ -456,13 +504,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getAvg(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
+ if (controller.failed()) {
+ throw new IOException(controller.errorText());
}
Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
if (response.getFirstPartCount() == 0) {
@@ -560,13 +608,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
@Override
public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getStd(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
+ if (controller.failed()) {
+ throw new IOException(controller.errorText());
}
Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
if (response.getFirstPartCount() == 0) {
@@ -676,13 +724,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, List<S>>() {
@Override
public List<S> call(AggregateService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
+ RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getMedian(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
+ if (controller.failed()) {
+ throw new IOException(controller.errorText());
}
List<S> list = new ArrayList<S>();