You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/31 07:20:46 UTC
hbase git commit: HBASE-17346 Add coprocessor service support
Repository: hbase
Updated Branches:
refs/heads/master 2511cc827 -> b7fc7bf24
HBASE-17346 Add coprocessor service support
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b7fc7bf2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b7fc7bf2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b7fc7bf2
Branch: refs/heads/master
Commit: b7fc7bf246934cea09e22f55bea62415f7319647
Parents: 2511cc8
Author: zhangduo <zh...@apache.org>
Authored: Mon Jan 30 16:23:04 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Jan 31 15:19:33 2017 +0800
----------------------------------------------------------------------
.../client/ClientCoprocessorRpcController.java | 74 +++
.../hadoop/hbase/client/RawAsyncTable.java | 189 ++++++++
.../hadoop/hbase/client/RawAsyncTableImpl.java | 95 ++++
.../client/RegionCoprocessorRpcChannelImpl.java | 117 +++++
.../client/coprocessor/AggregationClient.java | 88 +---
.../client/coprocessor/AggregationHelper.java | 109 +++++
.../coprocessor/AsyncAggregationClient.java | 464 +++++++++++++++++++
.../coprocessor/AggregateImplementation.java | 17 +-
.../client/TestAsyncAggregationClient.java | 167 +++++++
9 files changed, 1233 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java
new file mode 100644
index 0000000..149e1d3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Client side rpc controller for coprocessor implementation. It is only used to pass error.
+ */
+@InterfaceAudience.Private
+public class ClientCoprocessorRpcController implements RpcController {
+
+ private Throwable error;
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public boolean failed() {
+ return error != null;
+ }
+
+ @Override
+ public String errorText() {
+ return error != null ? error.getMessage() : null;
+ }
+
+ @Override
+ public void startCancel() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFailed(String reason) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return false;
+ }
+
+ @Override
+ public void notifyOnCancel(RpcCallback<Object> callback) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setFailed(Throwable error) {
+ this.error = error;
+ }
+
+ public Throwable getFailed() {
+ return error;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
index 67099e8..59924cf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -17,6 +17,14 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -60,4 +68,185 @@ public interface RawAsyncTable extends AsyncTableBase {
* @param consumer the consumer used to receive results.
*/
void scan(Scan scan, RawScanResultConsumer consumer);
+
+ /**
+ * Delegate to a protobuf rpc call.
+ * <p>
+ * Usually, it is just a simple lambda expression, like:
+ *
+ * <pre>
+ * <code>
+ * (stub, controller, rpcCallback) -> {
+ * XXXRequest request = ...; // prepare the request
+ * stub.xxx(controller, request, rpcCallback);
+ * }
+ * </code>
+ * </pre>
+ *
+ * And if you can prepare the {@code request} before calling the coprocessorService method, the
+ * lambda expression will be:
+ *
+ * <pre>
+ * <code>
+ * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback)
+ * </code>
+ * </pre>
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ @FunctionalInterface
+ interface CoprocessorCallable<S, R> {
+
+ /**
+ * Represent the actual protobuf rpc call.
+ * @param stub the asynchronous stub
+ * @param controller the rpc controller, has already been prepared for you
+ * @param rpcCallback the rpc callback, has already been prepared for you
+ */
+ void call(S stub, RpcController controller, RpcCallback<R> rpcCallback);
+ }
+
+ /**
+ * Execute the given coprocessor call on the region which contains the given {@code row}.
+ * <p>
+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
+ * one line lambda expression, like:
+ *
+ * <pre>
+ * <code>
+ * channel -> xxxService.newStub(channel)
+ * </code>
+ * </pre>
+ *
+ * @param stubMaker a delegation to the actual {@code newStub} call.
+ * @param callable a delegation to the actual protobuf rpc call. See the comment of
+ * {@link CoprocessorCallable} for more details.
+ * @param row The row key used to identify the remote region location
+ * @param <S> the type of the asynchronous stub
+ * @param <R> the type of the return value
+ * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
+ * @see CoprocessorCallable
+ */
+ <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, byte[] row);
+
+ /**
+ * The callback when we want to execute a coprocessor call on a range of regions.
+ * <p>
+ * As the locating itself also takes some time, the implementation may want to send rpc calls on
+ * the fly, which means we do not know how many regions we have when we get the return value of
+ * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
+ * passed all the return values to you(through the {@link #onRegionComplete(HRegionInfo, Object)}
+ * or {@link #onRegionError(HRegionInfo, Throwable)} calls), i.e, there will be no
+ * {@link #onRegionComplete(HRegionInfo, Object)} or
+ * {@link #onRegionError(HRegionInfo, Throwable)} calls in the future.
+ * <p>
+ * Here is a pseudo code to describe a typical implementation of a range coprocessor service
+ * method to help you better understand how the {@link CoprocessorCallback} will be called. The
+ * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
+ * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
+ *
+ * <pre>
+ * locateThenCall(byte[] row) {
+ * locate(row).whenComplete((location, locateError) -> {
+ * if (locateError != null) {
+ * callback.onError(locateError);
+ * return;
+ * }
+ * incPendingCall();
+ * region = location.getRegion();
+ * if (region.getEndKey() > endKey) {
+ * locateEnd = true;
+ * } else {
+ * locateThenCall(region.getEndKey());
+ * }
+ * sendCall().whenComplete((resp, error) -> {
+ * if (error != null) {
+ * callback.onRegionError(region, error);
+ * } else {
+ * callback.onRegionComplete(region, resp);
+ * }
+ * if (locateEnd && decPendingCallAndGet() == 0) {
+ * callback.onComplete();
+ * }
+ * });
+ * });
+ * }
+ * </pre>
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Unstable
+ interface CoprocessorCallback<R> {
+
+ /**
+ * @param region the region that the response belongs to
+ * @param resp the response of the coprocessor call
+ */
+ void onRegionComplete(HRegionInfo region, R resp);
+
+ /**
+ * @param region the region that the error belongs to
+ * @param error the response error of the coprocessor call
+ */
+ void onRegionError(HRegionInfo region, Throwable error);
+
+ /**
+ * Indicate that all responses of the regions have been notified by calling
+ * {@link #onRegionComplete(HRegionInfo, Object)} or
+ * {@link #onRegionError(HRegionInfo, Throwable)}.
+ */
+ void onComplete();
+
+ /**
+ * Indicate that we got an error which does not belong to any regions. Usually a locating error.
+ */
+ void onError(Throwable error);
+ }
+
+ /**
+ * Execute the given coprocessor call on the regions which are covered by the range from
+ * {@code startKey} inclusive and {@code endKey} exclusive. See the comment of
+ * {@link #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, CoprocessorCallback)}
+ * for more details.
+ * @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean,
+ * CoprocessorCallback)
+ */
+ default <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, byte[] startKey, byte[] endKey,
+ CoprocessorCallback<R> callback) {
+ coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback);
+ }
+
+ /**
+ * Execute the given coprocessor call on the regions which are covered by the range from
+ * {@code startKey} and {@code endKey}. The inclusive of boundaries are specified by
+ * {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code stubMaker} is just a
+ * delegation to the {@code xxxService.newStub} call. Usually it is only a one line lambda
+ * expression, like:
+ *
+ * <pre>
+ * <code>
+ * channel -> xxxService.newStub(channel)
+ * </code>
+ * </pre>
+ *
+ * @param stubMaker a delegation to the actual {@code newStub} call.
+ * @param callable a delegation to the actual protobuf rpc call. See the comment of
+ * {@link CoprocessorCallable} for more details.
+ * @param startKey start region selection with region containing this row. If {@code null}, the
+ * selection will start with the first table region.
+ * @param startKeyInclusive whether to include the startKey
+ * @param endKey select regions up to and including the region containing this row. If
+ * {@code null}, selection will continue through the last table region.
+ * @param endKeyInclusive whether to include the endKey
+ * @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
+ * for more details.
+ * @param <S> the type of the asynchronous stub
+ * @param <R> the type of the return value
+ * @see CoprocessorCallable
+ * @see CoprocessorCallback
+ */
+ <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
+ boolean endKeyInclusive, CoprocessorCallback<R> callback);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 87323ac..00f255e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -18,17 +18,26 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
+import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+
+import com.google.protobuf.RpcChannel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -436,4 +445,90 @@ class RawAsyncTableImpl implements RawAsyncTable {
return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
}
+ private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, HRegionInfo region, byte[] row) {
+ RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
+ region, row, rpcTimeoutNs, operationTimeoutNs);
+ S stub = stubMaker.apply(channel);
+ CompletableFuture<R> future = new CompletableFuture<>();
+ ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
+ callable.call(stub, controller, resp -> {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ future.complete(resp);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, byte[] row) {
+ return coprocessorService(stubMaker, callable, null, row);
+ }
+
+ private boolean locateFinished(HRegionInfo region, byte[] endKey, boolean endKeyInclusive) {
+ if (isEmptyStopRow(endKey)) {
+ if (isEmptyStopRow(region.getEndKey())) {
+ return true;
+ }
+ return false;
+ } else {
+ if (isEmptyStopRow(region.getEndKey())) {
+ return true;
+ }
+ int c = Bytes.compareTo(endKey, region.getEndKey());
+ // 1. if the region contains endKey
+ // 2. endKey is equal to the region's endKey and we do not want to include endKey.
+ return c < 0 || c == 0 && !endKeyInclusive;
+ }
+ }
+
+ private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback,
+ List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
+ AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
+ Throwable error) {
+ if (error != null) {
+ callback.onError(error);
+ return;
+ }
+ unfinishedRequest.incrementAndGet();
+ HRegionInfo region = loc.getRegionInfo();
+ if (locateFinished(region, endKey, endKeyInclusive)) {
+ locateFinished.set(true);
+ } else {
+ conn.getLocator()
+ .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
+ operationTimeoutNs)
+ .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey,
+ endKeyInclusive, locateFinished, unfinishedRequest, l, e));
+ }
+ coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
+ if (e != null) {
+ callback.onRegionError(region, e);
+ } else {
+ callback.onRegionComplete(region, r);
+ }
+ if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
+ callback.onComplete();
+ }
+ });
+ }
+
+ @Override
+ public <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
+ boolean endKeyInclusive, CoprocessorCallback<R> callback) {
+ byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW);
+ byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW);
+ List<HRegionLocation> locs = new ArrayList<>();
+ conn.getLocator()
+ .getRegionLocation(tableName, nonNullStartKey,
+ startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
+ .whenComplete(
+ (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey,
+ endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
new file mode 100644
index 0000000..28a5564
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The implementation of a region based coprocessor rpc channel.
+ */
+@InterfaceAudience.Private
+class RegionCoprocessorRpcChannelImpl implements RpcChannel {
+
+ private final AsyncConnectionImpl conn;
+
+ private final TableName tableName;
+
+ private final HRegionInfo region;
+
+ private final byte[] row;
+
+ private final long rpcTimeoutNs;
+
+ private final long operationTimeoutNs;
+
+ RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, HRegionInfo region,
+ byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
+ this.conn = conn;
+ this.tableName = tableName;
+ this.region = region;
+ this.row = row;
+ this.rpcTimeoutNs = rpcTimeoutNs;
+ this.operationTimeoutNs = operationTimeoutNs;
+ }
+
+ private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
+ Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
+ ClientService.Interface stub) {
+ CompletableFuture<Message> future = new CompletableFuture<>();
+ if (region != null
+ && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) {
+ future.completeExceptionally(new DoNotRetryIOException(
+ "Region name is changed, expected " + region.getRegionNameAsString() + ", actual "
+ + loc.getRegionInfo().getRegionNameAsString()));
+ return future;
+ }
+ CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
+ request, row, loc.getRegionInfo().getRegionName());
+ stub.execService(controller, csr,
+ new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
+
+ @Override
+ public void run(CoprocessorServiceResponse resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void callMethod(MethodDescriptor method, RpcController controller, Message request,
+ Message responsePrototype, RpcCallback<Message> done) {
+ conn.callerFactory.<Message> single().table(tableName).row(row)
+ .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call()
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ ((ClientCoprocessorRpcController) controller).setFailed(e);
+ }
+ done.run(r);
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
index 1eda730..304722e 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
@@ -19,12 +19,16 @@
package org.apache.hadoop.hbase.client.coprocessor;
+import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
import java.io.Closeable;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -49,18 +53,12 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
/**
* This client class is for invoking the aggregate functions deployed on the
* Region Server side via the AggregateService. This class will implement the
@@ -227,23 +225,7 @@ public class AggregationClient implements Closeable {
return aMaxCallBack.getMax();
}
- /*
- * @param scan
- * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
- */
- private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
- if (scan == null
- || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals(
- scan.getStartRow(), HConstants.EMPTY_START_ROW))
- || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals(
- scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
- throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
- } else if (!canFamilyBeAbsent) {
- if (scan.getFamilyMap().size() != 1) {
- throw new IOException("There must be only one family.");
- }
- }
- }
+
/**
* It gives the minimum value of a column for a given column family for the
@@ -846,22 +828,6 @@ public class AggregationClient implements Closeable {
return null;
}
- <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
- validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
- throws IOException {
- validateParameters(scan, canFamilyBeAbsent);
- final AggregateRequest.Builder requestBuilder =
- AggregateRequest.newBuilder();
- requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
- P columnInterpreterSpecificData = null;
- if ((columnInterpreterSpecificData = ci.getRequestData())
- != null) {
- requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
- }
- requestBuilder.setScan(ProtobufUtil.toScan(scan));
- return requestBuilder.build();
- }
-
byte[] getBytesFromResponse(ByteString response) {
ByteBuffer bb = response.asReadOnlyByteBuffer();
bb.rewind();
@@ -873,40 +839,4 @@ public class AggregationClient implements Closeable {
}
return bytes;
}
-
- /**
- * Get an instance of the argument type declared in a class's signature. The
- * argument type is assumed to be a PB Message subclass, and the instance is
- * created using parseFrom method on the passed ByteString.
- * @param runtimeClass the runtime type of the class
- * @param position the position of the argument in the class declaration
- * @param b the ByteString which should be parsed to get the instance created
- * @return the instance
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
- public static <T extends Message>
- T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b)
- throws IOException {
- Type type = runtimeClass.getGenericSuperclass();
- Type argType = ((ParameterizedType)type).getActualTypeArguments()[position];
- Class<T> classType = (Class<T>)argType;
- T inst;
- try {
- Method m = classType.getMethod("parseFrom", ByteString.class);
- inst = (T)m.invoke(null, b);
- return inst;
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (NoSuchMethodException e) {
- throw new IOException(e);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java
new file mode 100644
index 0000000..b91128c
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Helper class for constructing aggregation request and response.
+ */
+@InterfaceAudience.Private
+public class AggregationHelper {
+
+ /**
+ * @param scan
+ * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
+ */
+ private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
+ if (scan == null
+ || (Bytes.equals(scan.getStartRow(), scan.getStopRow())
+ && !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
+ || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0)
+ && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
+ throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
+ } else if (!canFamilyBeAbsent) {
+ if (scan.getFamilyMap().size() != 1) {
+ throw new IOException("There must be only one family.");
+ }
+ }
+ }
+
+ static <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
+ validateArgAndGetPB(Scan scan, ColumnInterpreter<R, S, P, Q, T> ci, boolean canFamilyBeAbsent)
+ throws IOException {
+ validateParameters(scan, canFamilyBeAbsent);
+ final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder();
+ requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
+ P columnInterpreterSpecificData = null;
+ if ((columnInterpreterSpecificData = ci.getRequestData()) != null) {
+ requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
+ }
+ requestBuilder.setScan(ProtobufUtil.toScan(scan));
+ return requestBuilder.build();
+ }
+
+ /**
+ * Get an instance of the argument type declared in a class's signature. The argument type is
+ * assumed to be a PB Message subclass, and the instance is created using parseFrom method on the
+ * passed ByteString.
+ * @param runtimeClass the runtime type of the class
+ * @param position the position of the argument in the class declaration
+ * @param b the ByteString which should be parsed to get the instance created
+ * @return the instance
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
+ public static <T extends Message> T getParsedGenericInstance(Class<?> runtimeClass, int position,
+ ByteString b) throws IOException {
+ Type type = runtimeClass.getGenericSuperclass();
+ Type argType = ((ParameterizedType) type).getActualTypeArguments()[position];
+ Class<T> classType = (Class<T>) argType;
+ T inst;
+ try {
+ Method m = classType.getMethod("parseFrom", ByteString.class);
+ inst = (T) m.invoke(null, b);
+ return inst;
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException(e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (InvocationTargetException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
new file mode 100644
index 0000000..f8d0a19
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
+
+import com.google.protobuf.Message;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.RawAsyncTable;
+import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
+import org.apache.hadoop.hbase.client.RawScanResultConsumer;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * This client class is for invoking the aggregate functions deployed on the Region Server side via
+ * the AggregateService. This class will implement the supporting functionality for
+ * summing/processing the individual results obtained from the AggregateService for each region.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class AsyncAggregationClient {
+
+ private static abstract class AbstractAggregationCallback<T>
+ implements CoprocessorCallback<AggregateResponse> {
+
+ private final CompletableFuture<T> future;
+
+ protected boolean finished = false;
+
+ private void completeExceptionally(Throwable error) {
+ if (finished) {
+ return;
+ }
+ finished = true;
+ future.completeExceptionally(error);
+ }
+
+ protected AbstractAggregationCallback(CompletableFuture<T> future) {
+ this.future = future;
+ }
+
+ @Override
+ public synchronized void onRegionError(HRegionInfo region, Throwable error) {
+ completeExceptionally(error);
+ }
+
+ @Override
+ public synchronized void onError(Throwable error) {
+ completeExceptionally(error);
+ }
+
+ protected abstract void aggregate(HRegionInfo region, AggregateResponse resp)
+ throws IOException;
+
+ @Override
+ public synchronized void onRegionComplete(HRegionInfo region, AggregateResponse resp) {
+ try {
+ aggregate(region, resp);
+ } catch (IOException e) {
+ completeExceptionally(e);
+ }
+ }
+
+ protected abstract T getFinalResult();
+
+ @Override
+ public synchronized void onComplete() {
+ if (finished) {
+ return;
+ }
+ finished = true;
+ future.complete(getFinalResult());
+ }
+ }
+
+ private static <R, S, P extends Message, Q extends Message, T extends Message> R
+ getCellValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp,
+ int firstPartIndex) throws IOException {
+ Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex));
+ return ci.getCellValueFromProto(q);
+ }
+
+ private static <R, S, P extends Message, Q extends Message, T extends Message> S
+ getPromotedValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp,
+ int firstPartIndex) throws IOException {
+ T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex));
+ return ci.getPromotedValueFromProto(t);
+ }
+
+ public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
+ max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<R> future = new CompletableFuture<>();
+ AggregateRequest req;
+ try {
+ req = validateArgAndGetPB(scan, ci, false);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
+
+ private R max;
+
+ @Override
+ protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
+ if (resp.getFirstPartCount() > 0) {
+ R result = getCellValueFromProto(ci, resp, 0);
+ if (max == null || (result != null && ci.compare(max, result) < 0)) {
+ max = result;
+ }
+ }
+ }
+
+ @Override
+ protected R getFinalResult() {
+ return max;
+ }
+ };
+ table.coprocessorService(channel -> AggregateService.newStub(channel),
+ (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback),
+ scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
+ callback);
+ return future;
+ }
+
+ public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
+ min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<R> future = new CompletableFuture<>();
+ AggregateRequest req;
+ try {
+ req = validateArgAndGetPB(scan, ci, false);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
+
+ private R min;
+
+ @Override
+ protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
+ if (resp.getFirstPartCount() > 0) {
+ R result = getCellValueFromProto(ci, resp, 0);
+ if (min == null || (result != null && ci.compare(min, result) > 0)) {
+ min = result;
+ }
+ }
+ }
+
+ @Override
+ protected R getFinalResult() {
+ return min;
+ }
+ };
+ table.coprocessorService(channel -> AggregateService.newStub(channel),
+ (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback),
+ scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
+ callback);
+ return future;
+ }
+
+ public static <R, S, P extends Message, Q extends Message, T extends Message>
+ CompletableFuture<Long>
+ rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ AggregateRequest req;
+ try {
+ req = validateArgAndGetPB(scan, ci, true);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ AbstractAggregationCallback<Long> callback = new AbstractAggregationCallback<Long>(future) {
+
+ private long count;
+
+ @Override
+ protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
+ count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong();
+ }
+
+ @Override
+ protected Long getFinalResult() {
+ return count;
+ }
+ };
+ table.coprocessorService(channel -> AggregateService.newStub(channel),
+ (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback),
+ scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
+ callback);
+ return future;
+ }
+
+ public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S>
+ sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<S> future = new CompletableFuture<>();
+ AggregateRequest req;
+ try {
+ req = validateArgAndGetPB(scan, ci, false);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) {
+
+ private S sum;
+
+ @Override
+ protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
+ if (resp.getFirstPartCount() > 0) {
+ S s = getPromotedValueFromProto(ci, resp, 0);
+ sum = ci.add(sum, s);
+ }
+ }
+
+ @Override
+ protected S getFinalResult() {
+ return sum;
+ }
+ };
+ table.coprocessorService(channel -> AggregateService.newStub(channel),
+ (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback),
+ scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
+ callback);
+ return future;
+ }
+
+ public static <R, S, P extends Message, Q extends Message, T extends Message>
+ CompletableFuture<Double>
+ avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<Double> future = new CompletableFuture<>();
+ AggregateRequest req;
+ try {
+ req = validateArgAndGetPB(scan, ci, false);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
+
+ private S sum;
+
+ long count = 0L;
+
+ @Override
+ protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
+ if (resp.getFirstPartCount() > 0) {
+ sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
+ count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
+ }
+ }
+
+ @Override
+ protected Double getFinalResult() {
+ return ci.divideForAvg(sum, count);
+ }
+ };
+ table.coprocessorService(channel -> AggregateService.newStub(channel),
+ (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback),
+ scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
+ callback);
+ return future;
+ }
+
+ public static <R, S, P extends Message, Q extends Message, T extends Message>
+ CompletableFuture<Double>
+ std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<Double> future = new CompletableFuture<>();
+ AggregateRequest req;
+ try {
+ req = validateArgAndGetPB(scan, ci, false);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
+
+ private S sum;
+
+ private S sumSq;
+
+ private long count;
+
+ @Override
+ protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
+ if (resp.getFirstPartCount() > 0) {
+ sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
+ sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1));
+ count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
+ }
+ }
+
+ @Override
+ protected Double getFinalResult() {
+ double avg = ci.divideForAvg(sum, count);
+ double avgSq = ci.divideForAvg(sumSq, count);
+ return Math.sqrt(avgSq - avg * avg);
+ }
+ };
+ table.coprocessorService(channel -> AggregateService.newStub(channel),
+ (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback),
+ scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
+ callback);
+ return future;
+ }
+
+ // the map key is the startRow of the region
+ private static <R, S, P extends Message, Q extends Message, T extends Message>
+ CompletableFuture<NavigableMap<byte[], S>>
+ sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<NavigableMap<byte[], S>> future =
+ new CompletableFuture<NavigableMap<byte[], S>>();
+ AggregateRequest req;
+ try {
+ req = validateArgAndGetPB(scan, ci, false);
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() - 1;
+ AbstractAggregationCallback<NavigableMap<byte[], S>> callback =
+ new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) {
+
+ private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+ @Override
+ protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
+ if (resp.getFirstPartCount() > 0) {
+ map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex));
+ }
+ }
+
+ @Override
+ protected NavigableMap<byte[], S> getFinalResult() {
+ return map;
+ }
+ };
+ table.coprocessorService(channel -> AggregateService.newStub(channel),
+ (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback),
+ scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
+ callback);
+ return future;
+ }
+
+ private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian(
+ CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci,
+ Scan scan, NavigableMap<byte[], S> sumByRegion) {
+ double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
+ S movingSum = null;
+ byte[] startRow = null;
+ for (Map.Entry<byte[], S> entry : sumByRegion.entrySet()) {
+ startRow = entry.getKey();
+ S newMovingSum = ci.add(movingSum, entry.getValue());
+ if (ci.divideForAvg(newMovingSum, 1L) > halfSum) {
+ break;
+ }
+ movingSum = newMovingSum;
+ }
+ if (startRow != null) {
+ scan.withStartRow(startRow);
+ }
+ // we can not pass movingSum directly to an anonymous class as it is not final.
+ S baseSum = movingSum;
+ byte[] family = scan.getFamilies()[0];
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+ byte[] weightQualifier = qualifiers.last();
+ byte[] valueQualifier = qualifiers.first();
+ table.scan(scan, new RawScanResultConsumer() {
+
+ private S sum = baseSum;
+
+ private R value = null;
+
+ @Override
+ public boolean onNext(Result[] results) {
+ try {
+ for (Result result : results) {
+ Cell weightCell = result.getColumnLatestCell(family, weightQualifier);
+ R weight = ci.getValue(family, weightQualifier, weightCell);
+ sum = ci.add(sum, ci.castToReturnType(weight));
+ if (ci.divideForAvg(sum, 1L) > halfSum) {
+ if (value != null) {
+ future.complete(value);
+ } else {
+ future.completeExceptionally(new NoSuchElementException());
+ }
+ return false;
+ }
+ Cell valueCell = result.getColumnLatestCell(family, valueQualifier);
+ value = ci.getValue(family, valueQualifier, valueCell);
+ }
+ return true;
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ return false;
+ }
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ future.completeExceptionally(error);
+ }
+
+ @Override
+ public void onComplete() {
+ if (!future.isDone()) {
+ // we should not reach here as the future should be completed in onNext.
+ future.completeExceptionally(new NoSuchElementException());
+ }
+ }
+ });
+ }
+
+ public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
+ median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ CompletableFuture<R> future = new CompletableFuture<>();
+ sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else if (sumByRegion.isEmpty()) {
+ future.completeExceptionally(new NoSuchElementException());
+ } else {
+ findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan),
+ sumByRegion);
+ }
+ });
+ return future;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 08b0562..bccb76a 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -18,6 +18,14 @@
*/
package org.apache.hadoop.hbase.coprocessor;
+import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -31,7 +39,6 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -40,12 +47,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-
/**
* A concrete AggregateProtocol implementation. Its system level coprocessor
* that computes the aggregate function at a region level.
@@ -485,7 +486,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
if (request.hasInterpreterSpecificBytes()) {
ByteString b = request.getInterpreterSpecificBytes();
- P initMsg = AggregationClient.getParsedGenericInstance(ci.getClass(), 2, b);
+ P initMsg = getParsedGenericInstance(ci.getClass(), 2, b);
ci.initialize(initMsg);
}
return ci;
http://git-wip-us.apache.org/repos/asf/hbase/blob/b7fc7bf2/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
new file mode 100644
index 0000000..1274dd5
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.coprocessor.AsyncAggregationClient;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, CoprocessorTests.class })
+public class TestAsyncAggregationClient {
+
+ private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("TestAsyncAggregationClient");
+
+ private static byte[] CF = Bytes.toBytes("CF");
+
+ private static byte[] CQ = Bytes.toBytes("CQ");
+
+ private static byte[] CQ2 = Bytes.toBytes("CQ2");
+
+ private static int COUNT = 1000;
+
+ private static AsyncConnection CONN;
+
+ private static RawAsyncTable TABLE;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ AggregateImplementation.class.getName());
+ UTIL.startMiniCluster(3);
+ byte[][] splitKeys = new byte[8][];
+ for (int i = 111; i < 999; i += 111) {
+ splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
+ }
+ UTIL.createTable(TABLE_NAME, CF, splitKeys);
+ CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration());
+ TABLE = CONN.getRawTable(TABLE_NAME);
+ TABLE.putAll(LongStream.range(0, COUNT)
+ .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))
+ .addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, Bytes.toBytes(l * l)))
+ .collect(Collectors.toList())).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CONN.close();
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMax() throws InterruptedException, ExecutionException {
+ assertEquals(COUNT - 1, AsyncAggregationClient
+ .max(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue());
+ }
+
+ @Test
+ public void testMin() throws InterruptedException, ExecutionException {
+ assertEquals(0, AsyncAggregationClient
+ .min(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue());
+ }
+
+ @Test
+ public void testRowCount() throws InterruptedException, ExecutionException {
+ assertEquals(COUNT,
+ AsyncAggregationClient
+ .rowCount(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get()
+ .longValue());
+ }
+
+ @Test
+ public void testSum() throws InterruptedException, ExecutionException {
+ assertEquals(COUNT * (COUNT - 1) / 2, AsyncAggregationClient
+ .sum(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue());
+ }
+
+ private static final double DELTA = 1E-3;
+
+ @Test
+ public void testAvg() throws InterruptedException, ExecutionException {
+ assertEquals((COUNT - 1) / 2.0, AsyncAggregationClient
+ .avg(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(),
+ DELTA);
+ }
+
+ @Test
+ public void testStd() throws InterruptedException, ExecutionException {
+ double avgSq =
+ LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong()
+ / (double) COUNT;
+ double avg = (COUNT - 1) / 2.0;
+ double std = Math.sqrt(avgSq - avg * avg);
+ assertEquals(std, AsyncAggregationClient
+ .std(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(),
+ DELTA);
+ }
+
+ @Test
+ public void testMedian() throws InterruptedException, ExecutionException {
+ long halfSum = COUNT * (COUNT - 1) / 4;
+ long median = 0L;
+ long sum = 0L;
+ for (int i = 0; i < COUNT; i++) {
+ sum += i;
+ if (sum > halfSum) {
+ median = i - 1;
+ break;
+ }
+ }
+ assertEquals(median,
+ AsyncAggregationClient
+ .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get()
+ .longValue());
+ }
+
+ @Test
+ public void testMedianWithWeight() throws InterruptedException, ExecutionException {
+ long halfSum =
+ LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2;
+ long median = 0L;
+ long sum = 0L;
+ for (int i = 0; i < COUNT; i++) {
+ sum += i * i;
+ if (sum > halfSum) {
+ median = i - 1;
+ break;
+ }
+ }
+ assertEquals(median, AsyncAggregationClient
+ .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ).addColumn(CF, CQ2))
+ .get().longValue());
+ }
+}