You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/01/04 03:16:20 UTC

hbase git commit: Revert "HBASE-17346 AggregationClient cleanup" Revert because I had wrong JIRA number (Spotted by Duo Zhang)

Repository: hbase
Updated Branches:
  refs/heads/master 69ce5967f -> 0583d7934


Revert "HBASE-17346 AggregationClient cleanup"
Revert because I had wrong JIRA number (Spotted by Duo Zhang)

This reverts commit 0a93241b61e6183b5671a4e7940e6212a17acd66.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0583d793
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0583d793
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0583d793

Branch: refs/heads/master
Commit: 0583d79346e16962bb35fed7587d56d2ec71c0fa
Parents: 69ce596
Author: Michael Stack <st...@apache.org>
Authored: Tue Jan 3 19:15:53 2017 -0800
Committer: Michael Stack <st...@apache.org>
Committed: Tue Jan 3 19:15:53 2017 -0800

----------------------------------------------------------------------
 .../client/coprocessor/AggregationClient.java   | 94 +++++---------------
 1 file changed, 23 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0583d793/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
index d236342..cde7d41 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
@@ -58,8 +59,6 @@ import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
 
 /**
  * This client class is for invoking the aggregate functions deployed on the
@@ -82,60 +81,13 @@ import com.google.protobuf.RpcController;
  * </ul>
  * <p>Call {@link #close()} when done.
  */
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 public class AggregationClient implements Closeable {
   // TODO: This class is not used.  Move to examples?
   private static final Log log = LogFactory.getLog(AggregationClient.class);
   private final Connection connection;
 
   /**
-   * An RpcController implementation for use here in this endpoint.
-   */
-  static class AggregationClientRpcController implements RpcController {
-    private String errorText;
-    private boolean cancelled = false;
-    private boolean failed = false;
-
-    @Override
-    public String errorText() {
-      return this.errorText;
-    }
-
-    @Override
-    public boolean failed() {
-      return this.failed;
-    }
-
-    @Override
-    public boolean isCanceled() {
-      return this.cancelled;
-    }
-
-    @Override
-    public void notifyOnCancel(RpcCallback<Object> arg0) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void reset() {
-      this.errorText = null;
-      this.cancelled = false;
-      this.failed = false;
-    }
-
-    @Override
-    public void setFailed(String errorText) {
-      this.failed = true;
-      this.errorText = errorText;
-    }
-
-    @Override
-    public void startCancel() {
-      this.cancelled = true;
-    }
-  }
-
-  /**
    * Constructor with Conf object
    * @param cfg
    */
@@ -208,13 +160,13 @@ public class AggregationClient implements Closeable {
         new Batch.Call<AggregateService, R>() {
           @Override
           public R call(AggregateService instance) throws IOException {
-            RpcController controller = new AggregationClientRpcController();
+            ServerRpcController controller = new ServerRpcController();
             CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
                 new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
             instance.getMax(controller, requestArg, rpcCallback);
             AggregateResponse response = rpcCallback.get();
-            if (controller.failed()) {
-              throw new IOException(controller.errorText());
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
             }
             if (response.getFirstPartCount() > 0) {
               ByteString b = response.getFirstPart(0);
@@ -296,13 +248,13 @@ public class AggregationClient implements Closeable {
 
           @Override
           public R call(AggregateService instance) throws IOException {
-            RpcController controller = new AggregationClientRpcController();
+            ServerRpcController controller = new ServerRpcController();
             CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
                 new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
             instance.getMin(controller, requestArg, rpcCallback);
             AggregateResponse response = rpcCallback.get();
-            if (controller.failed()) {
-              throw new IOException(controller.errorText());
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
             }
             if (response.getFirstPartCount() > 0) {
               ByteString b = response.getFirstPart(0);
@@ -371,13 +323,13 @@ public class AggregationClient implements Closeable {
         new Batch.Call<AggregateService, Long>() {
           @Override
           public Long call(AggregateService instance) throws IOException {
-            RpcController controller = new AggregationClientRpcController();
+            ServerRpcController controller = new ServerRpcController();
             CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
                 new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
             instance.getRowNum(controller, requestArg, rpcCallback);
             AggregateResponse response = rpcCallback.get();
-            if (controller.failed()) {
-              throw new IOException(controller.errorText());
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
             }
             byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
             ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
@@ -436,14 +388,14 @@ public class AggregationClient implements Closeable {
         new Batch.Call<AggregateService, S>() {
           @Override
           public S call(AggregateService instance) throws IOException {
-            RpcController controller = new AggregationClientRpcController();
+            ServerRpcController controller = new ServerRpcController();
             // Not sure what is going on here why I have to do these casts. TODO.
             CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
                 new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
             instance.getSum(controller, requestArg, rpcCallback);
             AggregateResponse response = rpcCallback.get();
-            if (controller.failed()) {
-              throw new IOException(controller.errorText());
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
             }
             if (response.getFirstPartCount() == 0) {
               return null;
@@ -504,13 +456,13 @@ public class AggregationClient implements Closeable {
         new Batch.Call<AggregateService, Pair<S, Long>>() {
           @Override
           public Pair<S, Long> call(AggregateService instance) throws IOException {
-            RpcController controller = new AggregationClientRpcController();
+            ServerRpcController controller = new ServerRpcController();
             CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
                 new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
             instance.getAvg(controller, requestArg, rpcCallback);
             AggregateResponse response = rpcCallback.get();
-            if (controller.failed()) {
-              throw new IOException(controller.errorText());
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
             }
             Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
             if (response.getFirstPartCount() == 0) {
@@ -608,13 +560,13 @@ public class AggregationClient implements Closeable {
         new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
           @Override
           public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
-            RpcController controller = new AggregationClientRpcController();
+            ServerRpcController controller = new ServerRpcController();
             CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
                 new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
             instance.getStd(controller, requestArg, rpcCallback);
             AggregateResponse response = rpcCallback.get();
-            if (controller.failed()) {
-              throw new IOException(controller.errorText());
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
             }
             Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
             if (response.getFirstPartCount() == 0) {
@@ -724,13 +676,13 @@ public class AggregationClient implements Closeable {
         new Batch.Call<AggregateService, List<S>>() {
           @Override
           public List<S> call(AggregateService instance) throws IOException {
-            RpcController controller = new AggregationClientRpcController();
+            ServerRpcController controller = new ServerRpcController();
             CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
                 new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
             instance.getMedian(controller, requestArg, rpcCallback);
             AggregateResponse response = rpcCallback.get();
-            if (controller.failed()) {
-              throw new IOException(controller.errorText());
+            if (controller.failedOnException()) {
+              throw controller.getFailedOn();
             }
 
             List<S> list = new ArrayList<S>();