You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/09/29 19:37:54 UTC
[40/51] [partial] hbase git commit: HBASE-16264 Figure how to deal
with endpoints and shaded pb Shade our protobufs. Do it in a manner that
makes it so we can still have in our API references to com.google.protobuf
(and in REST). The c.g.p in API is for
http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
new file mode 100644
index 0000000..cde7d41
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
@@ -0,0 +1,864 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+/**
+ * This client class is for invoking the aggregate functions deployed on the
+ * Region Server side via the AggregateService. This class will implement the
+ * supporting functionality for summing/processing the individual results
+ * obtained from the AggregateService for each region.
+ * <p>
+ * This will serve as the client side handler for invoking the aggregate
+ * functions.
+ * For all aggregate functions,
+ * <ul>
+ * <li>start row < end row is an essential condition (if they are not
+ * {@link HConstants#EMPTY_BYTE_ARRAY})
+ * <li>Column family can't be null. In case where multiple families are
+ * provided, an IOException will be thrown. An optional column qualifier can
+ * also be defined.</li>
+ * <li>For methods to find maximum, minimum, sum, rowcount, it returns the
+ * parameter type. For average and std, it returns a double value. For row
+ * count, it returns a long value.</li>
+ * </ul>
+ * <p>Call {@link #close()} when done.
+ */
+@InterfaceAudience.Private
+public class AggregationClient implements Closeable {
+ // TODO: This class is not used. Move to examples?
+ private static final Log log = LogFactory.getLog(AggregationClient.class);
+ private final Connection connection;
+
+ /**
+ * Constructor with Conf object
+ * @param cfg
+ */
+ public AggregationClient(Configuration cfg) {
+ try {
+ // Create a connection on construction. Will use it making each of the calls below.
+ this.connection = ConnectionFactory.createConnection(cfg);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.connection != null && !this.connection.isClosed()) {
+ this.connection.close();
+ }
+ }
+
+ /**
+ * It gives the maximum value of a column for a given column family for the
+ * given range. In case qualifier is null, a max of all values for the given
+ * family is returned.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return max val <R>
+ * @throws Throwable
+ * The caller is supposed to handle the exception as they are thrown
+ * & propagated to it.
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message> R max(
+ final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
+ throws Throwable {
+ try (Table table = connection.getTable(tableName)) {
+ return max(table, ci, scan);
+ }
+ }
+
+ /**
+ * It gives the maximum value of a column for a given column family for the
+ * given range. In case qualifier is null, a max of all values for the given
+ * family is returned.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return max val <>
+ * @throws Throwable
+ * The caller is supposed to handle the exception as they are thrown
+ * & propagated to it.
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
+ final Scan scan) throws Throwable {
+ final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
+ class MaxCallBack implements Batch.Callback<R> {
+ R max = null;
+
+ R getMax() {
+ return max;
+ }
+
+ @Override
+ public synchronized void update(byte[] region, byte[] row, R result) {
+ max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max;
+ }
+ }
+ MaxCallBack aMaxCallBack = new MaxCallBack();
+ table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<AggregateService, R>() {
+ @Override
+ public R call(AggregateService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
+ instance.getMax(controller, requestArg, rpcCallback);
+ AggregateResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ if (response.getFirstPartCount() > 0) {
+ ByteString b = response.getFirstPart(0);
+ Q q = getParsedGenericInstance(ci.getClass(), 3, b);
+ return ci.getCellValueFromProto(q);
+ }
+ return null;
+ }
+ }, aMaxCallBack);
+ return aMaxCallBack.getMax();
+ }
+
+ /*
+ * @param scan
+ * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
+ */
+ private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
+ if (scan == null
+ || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals(
+ scan.getStartRow(), HConstants.EMPTY_START_ROW))
+ || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals(
+ scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
+ throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
+ } else if (!canFamilyBeAbsent) {
+ if (scan.getFamilyMap().size() != 1) {
+ throw new IOException("There must be only one family.");
+ }
+ }
+ }
+
+ /**
+ * It gives the minimum value of a column for a given column family for the
+ * given range. In case qualifier is null, a min of all values for the given
+ * family is returned.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return min val <R>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message> R min(
+ final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
+ throws Throwable {
+ try (Table table = connection.getTable(tableName)) {
+ return min(table, ci, scan);
+ }
+ }
+
+ /**
+ * It gives the minimum value of a column for a given column family for the
+ * given range. In case qualifier is null, a min of all values for the given
+ * family is returned.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return min val <R>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
+ final Scan scan) throws Throwable {
+ final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
+ class MinCallBack implements Batch.Callback<R> {
+
+ private R min = null;
+
+ public R getMinimum() {
+ return min;
+ }
+
+ @Override
+ public synchronized void update(byte[] region, byte[] row, R result) {
+ min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
+ }
+ }
+ MinCallBack minCallBack = new MinCallBack();
+ table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<AggregateService, R>() {
+
+ @Override
+ public R call(AggregateService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
+ instance.getMin(controller, requestArg, rpcCallback);
+ AggregateResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ if (response.getFirstPartCount() > 0) {
+ ByteString b = response.getFirstPart(0);
+ Q q = getParsedGenericInstance(ci.getClass(), 3, b);
+ return ci.getCellValueFromProto(q);
+ }
+ return null;
+ }
+ }, minCallBack);
+ log.debug("Min fom all regions is: " + minCallBack.getMinimum());
+ return minCallBack.getMinimum();
+ }
+
+ /**
+ * It gives the row count, by summing up the individual results obtained from
+ * regions. In case the qualifier is null, FirstKeyValueFilter is used to
+ * optimised the operation. In case qualifier is provided, I can't use the
+ * filter as it may set the flag to skip to next row, but the value read is
+ * not of the given filter: in this case, this particular row will not be
+ * counted ==> an error.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return <R, S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
+ final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
+ throws Throwable {
+ try (Table table = connection.getTable(tableName)) {
+ return rowCount(table, ci, scan);
+ }
+ }
+
+ /**
+ * It gives the row count, by summing up the individual results obtained from
+ * regions. In case the qualifier is null, FirstKeyValueFilter is used to
+ * optimised the operation. In case qualifier is provided, I can't use the
+ * filter as it may set the flag to skip to next row, but the value read is
+ * not of the given filter: in this case, this particular row will not be
+ * counted ==> an error.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return <R, S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ long rowCount(final Table table,
+ final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
+ final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
+ class RowNumCallback implements Batch.Callback<Long> {
+ private final AtomicLong rowCountL = new AtomicLong(0);
+
+ public long getRowNumCount() {
+ return rowCountL.get();
+ }
+
+ @Override
+ public void update(byte[] region, byte[] row, Long result) {
+ rowCountL.addAndGet(result.longValue());
+ }
+ }
+ RowNumCallback rowNum = new RowNumCallback();
+ table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<AggregateService, Long>() {
+ @Override
+ public Long call(AggregateService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
+ instance.getRowNum(controller, requestArg, rpcCallback);
+ AggregateResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
+ ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
+ bb.rewind();
+ return bb.getLong();
+ }
+ }, rowNum);
+ return rowNum.getRowNumCount();
+ }
+
+ /**
+ * It sums up the value returned from various regions. In case qualifier is
+ * null, summation of all the column qualifiers in the given family is done.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return sum <S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
+ final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
+ throws Throwable {
+ try (Table table = connection.getTable(tableName)) {
+ return sum(table, ci, scan);
+ }
+ }
+
+ /**
+ * It sums up the value returned from various regions. In case qualifier is
+ * null, summation of all the column qualifiers in the given family is done.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return sum <S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci,
+ final Scan scan) throws Throwable {
+ final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
+
+ class SumCallBack implements Batch.Callback<S> {
+ S sumVal = null;
+
+ public S getSumResult() {
+ return sumVal;
+ }
+
+ @Override
+ public synchronized void update(byte[] region, byte[] row, S result) {
+ sumVal = ci.add(sumVal, result);
+ }
+ }
+ SumCallBack sumCallBack = new SumCallBack();
+ table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<AggregateService, S>() {
+ @Override
+ public S call(AggregateService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ // Not sure what is going on here why I have to do these casts. TODO.
+ CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
+ instance.getSum(controller, requestArg, rpcCallback);
+ AggregateResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ if (response.getFirstPartCount() == 0) {
+ return null;
+ }
+ ByteString b = response.getFirstPart(0);
+ T t = getParsedGenericInstance(ci.getClass(), 4, b);
+ S s = ci.getPromotedValueFromProto(t);
+ return s;
+ }
+ }, sumCallBack);
+ return sumCallBack.getSumResult();
+ }
+
+ /**
+ * It computes average while fetching sum and row count from all the
+ * corresponding regions. Approach is to compute a global sum of region level
+ * sum and rowcount and then compute the average.
+ * @param tableName
+ * @param scan
+ * @throws Throwable
+ */
+ private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
+ final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
+ throws Throwable {
+ try (Table table = connection.getTable(tableName)) {
+ return getAvgArgs(table, ci, scan);
+ }
+ }
+
+ /**
+ * It computes average while fetching sum and row count from all the
+ * corresponding regions. Approach is to compute a global sum of region level
+ * sum and rowcount and then compute the average.
+ * @param table
+ * @param scan
+ * @throws Throwable
+ */
+ private <R, S, P extends Message, Q extends Message, T extends Message>
+ Pair<S, Long> getAvgArgs(final Table table,
+ final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
+ final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
+ class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
+ S sum = null;
+ Long rowCount = 0l;
+
+ public synchronized Pair<S, Long> getAvgArgs() {
+ return new Pair<S, Long>(sum, rowCount);
+ }
+
+ @Override
+ public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
+ sum = ci.add(sum, result.getFirst());
+ rowCount += result.getSecond();
+ }
+ }
+ AvgCallBack avgCallBack = new AvgCallBack();
+ table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<AggregateService, Pair<S, Long>>() {
+ @Override
+ public Pair<S, Long> call(AggregateService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
+ instance.getAvg(controller, requestArg, rpcCallback);
+ AggregateResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
+ if (response.getFirstPartCount() == 0) {
+ return pair;
+ }
+ ByteString b = response.getFirstPart(0);
+ T t = getParsedGenericInstance(ci.getClass(), 4, b);
+ S s = ci.getPromotedValueFromProto(t);
+ pair.setFirst(s);
+ ByteBuffer bb = ByteBuffer.allocate(8).put(
+ getBytesFromResponse(response.getSecondPart()));
+ bb.rewind();
+ pair.setSecond(bb.getLong());
+ return pair;
+ }
+ }, avgCallBack);
+ return avgCallBack.getAvgArgs();
+ }
+
+ /**
+ * This is the client side interface/handle for calling the average method for
+ * a given cf-cq combination. It was necessary to add one more call stack as
+ * its return type should be a decimal value, irrespective of what
+ * columninterpreter says. So, this methods collects the necessary parameters
+ * to compute the average and returs the double value.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return <R, S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ double avg(final TableName tableName,
+ final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
+ Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
+ return ci.divideForAvg(p.getFirst(), p.getSecond());
+ }
+
+ /**
+ * This is the client side interface/handle for calling the average method for
+ * a given cf-cq combination. It was necessary to add one more call stack as
+ * its return type should be a decimal value, irrespective of what
+ * columninterpreter says. So, this methods collects the necessary parameters
+ * to compute the average and returs the double value.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return <R, S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
+ final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
+ Pair<S, Long> p = getAvgArgs(table, ci, scan);
+ return ci.divideForAvg(p.getFirst(), p.getSecond());
+ }
+
+ /**
+ * It computes a global standard deviation for a given column and its value.
+ * Standard deviation is square root of (average of squares -
+ * average*average). From individual regions, it obtains sum, square sum and
+ * number of rows. With these, the above values are computed to get the global
+ * std.
+ * @param table
+ * @param scan
+ * @return standard deviations
+ * @throws Throwable
+ */
+ private <R, S, P extends Message, Q extends Message, T extends Message>
+ Pair<List<S>, Long> getStdArgs(final Table table,
+ final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
+ final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
+ class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
+ long rowCountVal = 0l;
+ S sumVal = null, sumSqVal = null;
+
+ public synchronized Pair<List<S>, Long> getStdParams() {
+ List<S> l = new ArrayList<S>();
+ l.add(sumVal);
+ l.add(sumSqVal);
+ Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
+ return p;
+ }
+
+ @Override
+ public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
+ if (result.getFirst().size() > 0) {
+ sumVal = ci.add(sumVal, result.getFirst().get(0));
+ sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
+ rowCountVal += result.getSecond();
+ }
+ }
+ }
+ StdCallback stdCallback = new StdCallback();
+ table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
+ @Override
+ public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
+ instance.getStd(controller, requestArg, rpcCallback);
+ AggregateResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
+ if (response.getFirstPartCount() == 0) {
+ return pair;
+ }
+ List<S> list = new ArrayList<S>();
+ for (int i = 0; i < response.getFirstPartCount(); i++) {
+ ByteString b = response.getFirstPart(i);
+ T t = getParsedGenericInstance(ci.getClass(), 4, b);
+ S s = ci.getPromotedValueFromProto(t);
+ list.add(s);
+ }
+ pair.setFirst(list);
+ ByteBuffer bb = ByteBuffer.allocate(8).put(
+ getBytesFromResponse(response.getSecondPart()));
+ bb.rewind();
+ pair.setSecond(bb.getLong());
+ return pair;
+ }
+ }, stdCallback);
+ return stdCallback.getStdParams();
+ }
+
+ /**
+ * This is the client side interface/handle for calling the std method for a
+ * given cf-cq combination. It was necessary to add one more call stack as its
+ * return type should be a decimal value, irrespective of what
+ * columninterpreter says. So, this methods collects the necessary parameters
+ * to compute the std and returns the double value.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return <R, S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
+ Scan scan) throws Throwable {
+ try (Table table = connection.getTable(tableName)) {
+ return std(table, ci, scan);
+ }
+ }
+
+ /**
+ * This is the client side interface/handle for calling the std method for a
+ * given cf-cq combination. It was necessary to add one more call stack as its
+ * return type should be a decimal value, irrespective of what
+ * columninterpreter says. So, this methods collects the necessary parameters
+ * to compute the std and returns the double value.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return <R, S>
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message> double std(
+ final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
+ Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
+ double res = 0d;
+ double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
+ double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
+ res = avgOfSumSq - (avg) * (avg); // variance
+ res = Math.pow(res, 0.5);
+ return res;
+ }
+
+ /**
+ * It helps locate the region with median for a given column whose weight
+ * is specified in an optional column.
+ * From individual regions, it obtains sum of values and sum of weights.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return pair whose first element is a map between start row of the region
+ * and (sum of values, sum of weights) for the region, the second element is
+ * (sum of values, sum of weights) for all the regions chosen
+ * @throws Throwable
+ */
+ private <R, S, P extends Message, Q extends Message, T extends Message>
+ Pair<NavigableMap<byte[], List<S>>, List<S>>
+ getMedianArgs(final Table table,
+ final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
+ final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
+ final NavigableMap<byte[], List<S>> map =
+ new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
+ class StdCallback implements Batch.Callback<List<S>> {
+ S sumVal = null, sumWeights = null;
+
+ public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
+ List<S> l = new ArrayList<S>();
+ l.add(sumVal);
+ l.add(sumWeights);
+ Pair<NavigableMap<byte[], List<S>>, List<S>> p =
+ new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
+ return p;
+ }
+
+ @Override
+ public synchronized void update(byte[] region, byte[] row, List<S> result) {
+ map.put(row, result);
+ sumVal = ci.add(sumVal, result.get(0));
+ sumWeights = ci.add(sumWeights, result.get(1));
+ }
+ }
+ StdCallback stdCallback = new StdCallback();
+ table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
+ new Batch.Call<AggregateService, List<S>>() {
+ @Override
+ public List<S> call(AggregateService instance) throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
+ instance.getMedian(controller, requestArg, rpcCallback);
+ AggregateResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+
+ List<S> list = new ArrayList<S>();
+ for (int i = 0; i < response.getFirstPartCount(); i++) {
+ ByteString b = response.getFirstPart(i);
+ T t = getParsedGenericInstance(ci.getClass(), 4, b);
+ S s = ci.getPromotedValueFromProto(t);
+ list.add(s);
+ }
+ return list;
+ }
+
+ }, stdCallback);
+ return stdCallback.getMedianParams();
+ }
+
+ /**
+ * This is the client side interface/handler for calling the median method for a
+ * given cf-cq combination. This method collects the necessary parameters
+ * to compute the median and returns the median.
+ * @param tableName
+ * @param ci
+ * @param scan
+ * @return R the median
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci,
+ Scan scan) throws Throwable {
+ try (Table table = connection.getTable(tableName)) {
+ return median(table, ci, scan);
+ }
+ }
+
+ /**
+ * This is the client side interface/handler for calling the median method for a
+ * given cf-cq combination. This method collects the necessary parameters
+ * to compute the median and returns the median.
+ * @param table
+ * @param ci
+ * @param scan
+ * @return R the median
+ * @throws Throwable
+ */
+ public <R, S, P extends Message, Q extends Message, T extends Message>
+ R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci,
+ Scan scan) throws Throwable {
+ Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
+ byte[] startRow = null;
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+ NavigableMap<byte[], List<S>> map = p.getFirst();
+ S sumVal = p.getSecond().get(0);
+ S sumWeights = p.getSecond().get(1);
+ double halfSumVal = ci.divideForAvg(sumVal, 2L);
+ double movingSumVal = 0;
+ boolean weighted = false;
+ if (quals.size() > 1) {
+ weighted = true;
+ halfSumVal = ci.divideForAvg(sumWeights, 2L);
+ }
+
+ for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
+ S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
+ double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+ if (newSumVal > halfSumVal) break; // we found the region with the median
+ movingSumVal = newSumVal;
+ startRow = entry.getKey();
+ }
+ // scan the region with median and find it
+ Scan scan2 = new Scan(scan);
+ // inherit stop row from method parameter
+ if (startRow != null) scan2.setStartRow(startRow);
+ ResultScanner scanner = null;
+ try {
+ int cacheSize = scan2.getCaching();
+ if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
+ scan2.setCacheBlocks(true);
+ cacheSize = 5;
+ scan2.setCaching(cacheSize);
+ }
+ scanner = table.getScanner(scan2);
+ Result[] results = null;
+ byte[] qualifier = quals.pollFirst();
+ // qualifier for the weight column
+ byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
+ R value = null;
+ do {
+ results = scanner.next(cacheSize);
+ if (results != null && results.length > 0) {
+ for (int i = 0; i < results.length; i++) {
+ Result r = results[i];
+ // retrieve weight
+ Cell kv = r.getColumnLatestCell(colFamily, weightQualifier);
+ R newValue = ci.getValue(colFamily, weightQualifier, kv);
+ S s = ci.castToReturnType(newValue);
+ double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+ // see if we have moved past the median
+ if (newSumVal > halfSumVal) {
+ return value;
+ }
+ movingSumVal = newSumVal;
+ kv = r.getColumnLatestCell(colFamily, qualifier);
+ value = ci.getValue(colFamily, qualifier, kv);
+ }
+ }
+ } while (results != null && results.length > 0);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ }
+ return null;
+ }
+
+ <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
+ validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
+ throws IOException {
+ validateParameters(scan, canFamilyBeAbsent);
+ final AggregateRequest.Builder requestBuilder =
+ AggregateRequest.newBuilder();
+ requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
+ P columnInterpreterSpecificData = null;
+ if ((columnInterpreterSpecificData = ci.getRequestData())
+ != null) {
+ requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
+ }
+ requestBuilder.setScan(ProtobufUtil.toScan(scan));
+ return requestBuilder.build();
+ }
+
+ byte[] getBytesFromResponse(ByteString response) {
+ ByteBuffer bb = response.asReadOnlyByteBuffer();
+ bb.rewind();
+ byte[] bytes;
+ if (bb.hasArray()) {
+ bytes = bb.array();
+ } else {
+ bytes = response.toByteArray();
+ }
+ return bytes;
+ }
+
+ /**
+ * Get an instance of the argument type declared in a class's signature. The
+ * argument type is assumed to be a PB Message subclass, and the instance is
+ * created using parseFrom method on the passed ByteString.
+ * @param runtimeClass the runtime type of the class
+ * @param position the position of the argument in the class declaration
+ * @param b the ByteString which should be parsed to get the instance created
+ * @return the instance
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
+ public static <T extends Message>
+ T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b)
+ throws IOException {
+ Type type = runtimeClass.getGenericSuperclass();
+ Type argType = ((ParameterizedType)type).getActualTypeArguments()[position];
+ Class<T> classType = (Class<T>)argType;
+ T inst;
+ try {
+ Method m = classType.getMethod("parseFrom", ByteString.class);
+ inst = (T)m.invoke(null, b);
+ return inst;
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException(e);
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (InvocationTargetException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
new file mode 100644
index 0000000..08b0562
--- /dev/null
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -0,0 +1,530 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+/**
+ * A concrete AggregateProtocol implementation. Its system level coprocessor
+ * that computes the aggregate function at a region level.
+ * {@link ColumnInterpreter} is used to interpret column value. This class is
+ * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
+ * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
+ * @param T Cell value data type
+ * @param S Promoted data type
+ * @param P PB message that is used to transport initializer specific bytes
+ * @param Q PB message that is used to transport Cell (<T>) instance
+ * @param R PB message that is used to transport Promoted (<S>) instance
+ */
+@InterfaceAudience.Private
+public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message>
+extends AggregateService implements CoprocessorService, Coprocessor {
+ protected static final Log log = LogFactory.getLog(AggregateImplementation.class);
+ private RegionCoprocessorEnvironment env;
+
+ /**
+ * Gives the maximum for a given combination of column qualifier and column
+ * family, in the given row range as defined in the Scan object. In its
+ * current implementation, it takes one column family and one column qualifier
+ * (if provided). In case of null column qualifier, maximum value for the
+ * entire column family will be returned.
+ */
+ @Override
+ public void getMax(RpcController controller, AggregateRequest request,
+ RpcCallback<AggregateResponse> done) {
+ InternalScanner scanner = null;
+ AggregateResponse response = null;
+ T max = null;
+ try {
+ ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
+ T temp;
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ scanner = env.getRegion().getScanner(scan);
+ List<Cell> results = new ArrayList<Cell>();
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
+ byte[] qualifier = null;
+ if (qualifiers != null && !qualifiers.isEmpty()) {
+ qualifier = qualifiers.pollFirst();
+ }
+ // qualifier can be null.
+ boolean hasMoreRows = false;
+ do {
+ hasMoreRows = scanner.next(results);
+ int listSize = results.size();
+ for (int i = 0; i < listSize; i++) {
+ temp = ci.getValue(colFamily, qualifier, results.get(i));
+ max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;
+ }
+ results.clear();
+ } while (hasMoreRows);
+ if (max != null) {
+ AggregateResponse.Builder builder = AggregateResponse.newBuilder();
+ builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
+ response = builder.build();
+ }
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {}
+ }
+ }
+ log.info("Maximum from this region is "
+ + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max);
+ done.run(response);
+ }
+
+ /**
+ * Gives the minimum for a given combination of column qualifier and column
+ * family, in the given row range as defined in the Scan object. In its
+ * current implementation, it takes one column family and one column qualifier
+ * (if provided). In case of null column qualifier, minimum value for the
+ * entire column family will be returned.
+ */
+ @Override
+ public void getMin(RpcController controller, AggregateRequest request,
+ RpcCallback<AggregateResponse> done) {
+ AggregateResponse response = null;
+ InternalScanner scanner = null;
+ T min = null;
+ try {
+ ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
+ T temp;
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ scanner = env.getRegion().getScanner(scan);
+ List<Cell> results = new ArrayList<Cell>();
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
+ byte[] qualifier = null;
+ if (qualifiers != null && !qualifiers.isEmpty()) {
+ qualifier = qualifiers.pollFirst();
+ }
+ boolean hasMoreRows = false;
+ do {
+ hasMoreRows = scanner.next(results);
+ int listSize = results.size();
+ for (int i = 0; i < listSize; i++) {
+ temp = ci.getValue(colFamily, qualifier, results.get(i));
+ min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min;
+ }
+ results.clear();
+ } while (hasMoreRows);
+ if (min != null) {
+ response = AggregateResponse.newBuilder().addFirstPart(
+ ci.getProtoForCellType(min).toByteString()).build();
+ }
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {}
+ }
+ }
+ log.info("Minimum from this region is "
+ + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min);
+ done.run(response);
+ }
+
+ /**
+ * Gives the sum for a given combination of column qualifier and column
+ * family, in the given row range as defined in the Scan object. In its
+ * current implementation, it takes one column family and one column qualifier
+ * (if provided). In case of null column qualifier, sum for the entire column
+ * family will be returned.
+ */
+ @Override
+ public void getSum(RpcController controller, AggregateRequest request,
+ RpcCallback<AggregateResponse> done) {
+ AggregateResponse response = null;
+ InternalScanner scanner = null;
+ long sum = 0l;
+ try {
+ ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
+ S sumVal = null;
+ T temp;
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ scanner = env.getRegion().getScanner(scan);
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
+ byte[] qualifier = null;
+ if (qualifiers != null && !qualifiers.isEmpty()) {
+ qualifier = qualifiers.pollFirst();
+ }
+ List<Cell> results = new ArrayList<Cell>();
+ boolean hasMoreRows = false;
+ do {
+ hasMoreRows = scanner.next(results);
+ int listSize = results.size();
+ for (int i = 0; i < listSize; i++) {
+ temp = ci.getValue(colFamily, qualifier, results.get(i));
+ if (temp != null)
+ sumVal = ci.add(sumVal, ci.castToReturnType(temp));
+ }
+ results.clear();
+ } while (hasMoreRows);
+ if (sumVal != null) {
+ response = AggregateResponse.newBuilder().addFirstPart(
+ ci.getProtoForPromotedType(sumVal).toByteString()).build();
+ }
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {}
+ }
+ }
+ log.debug("Sum from this region is "
+ + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum);
+ done.run(response);
+ }
+
+ /**
+ * Gives the row count for the given column family and column qualifier, in
+ * the given row range as defined in the Scan object.
+ */
+ @Override
+ public void getRowNum(RpcController controller, AggregateRequest request,
+ RpcCallback<AggregateResponse> done) {
+ AggregateResponse response = null;
+ long counter = 0l;
+ List<Cell> results = new ArrayList<Cell>();
+ InternalScanner scanner = null;
+ try {
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ byte[][] colFamilies = scan.getFamilies();
+ byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
+ NavigableSet<byte[]> qualifiers = colFamilies != null ?
+ scan.getFamilyMap().get(colFamily) : null;
+ byte[] qualifier = null;
+ if (qualifiers != null && !qualifiers.isEmpty()) {
+ qualifier = qualifiers.pollFirst();
+ }
+ if (scan.getFilter() == null && qualifier == null)
+ scan.setFilter(new FirstKeyOnlyFilter());
+ scanner = env.getRegion().getScanner(scan);
+ boolean hasMoreRows = false;
+ do {
+ hasMoreRows = scanner.next(results);
+ if (results.size() > 0) {
+ counter++;
+ }
+ results.clear();
+ } while (hasMoreRows);
+ ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
+ bb.rewind();
+ response = AggregateResponse.newBuilder().addFirstPart(
+ ByteString.copyFrom(bb)).build();
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {}
+ }
+ }
+ log.info("Row counter from this region is "
+ + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
+ done.run(response);
+ }
+
+ /**
+ * Gives a Pair with first object as Sum and second object as row count,
+ * computed for a given combination of column qualifier and column family in
+ * the given row range as defined in the Scan object. In its current
+ * implementation, it takes one column family and one column qualifier (if
+ * provided). In case of null column qualifier, an aggregate sum over all the
+ * entire column family will be returned.
+ * <p>
+ * The average is computed in
+ * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by
+ * processing results from all regions, so its "ok" to pass sum and a Long
+ * type.
+ */
+ @Override
+ public void getAvg(RpcController controller, AggregateRequest request,
+ RpcCallback<AggregateResponse> done) {
+ AggregateResponse response = null;
+ InternalScanner scanner = null;
+ try {
+ ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
+ S sumVal = null;
+ Long rowCountVal = 0l;
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ scanner = env.getRegion().getScanner(scan);
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
+ byte[] qualifier = null;
+ if (qualifiers != null && !qualifiers.isEmpty()) {
+ qualifier = qualifiers.pollFirst();
+ }
+ List<Cell> results = new ArrayList<Cell>();
+ boolean hasMoreRows = false;
+
+ do {
+ results.clear();
+ hasMoreRows = scanner.next(results);
+ int listSize = results.size();
+ for (int i = 0; i < listSize; i++) {
+ sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
+ qualifier, results.get(i))));
+ }
+ rowCountVal++;
+ } while (hasMoreRows);
+ if (sumVal != null) {
+ ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
+ AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+ pair.addFirstPart(first);
+ ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+ bb.rewind();
+ pair.setSecondPart(ByteString.copyFrom(bb));
+ response = pair.build();
+ }
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {}
+ }
+ }
+ done.run(response);
+ }
+
+ /**
+ * Gives a Pair with first object a List containing Sum and sum of squares,
+ * and the second object as row count. It is computed for a given combination of
+ * column qualifier and column family in the given row range as defined in the
+ * Scan object. In its current implementation, it takes one column family and
+ * one column qualifier (if provided). The idea is get the value of variance first:
+ * the average of the squares less the square of the average a standard
+ * deviation is square root of variance.
+ */
+ @Override
+ public void getStd(RpcController controller, AggregateRequest request,
+ RpcCallback<AggregateResponse> done) {
+ InternalScanner scanner = null;
+ AggregateResponse response = null;
+ try {
+ ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
+ S sumVal = null, sumSqVal = null, tempVal = null;
+ long rowCountVal = 0l;
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ scanner = env.getRegion().getScanner(scan);
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
+ byte[] qualifier = null;
+ if (qualifiers != null && !qualifiers.isEmpty()) {
+ qualifier = qualifiers.pollFirst();
+ }
+ List<Cell> results = new ArrayList<Cell>();
+
+ boolean hasMoreRows = false;
+
+ do {
+ tempVal = null;
+ hasMoreRows = scanner.next(results);
+ int listSize = results.size();
+ for (int i = 0; i < listSize; i++) {
+ tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
+ qualifier, results.get(i))));
+ }
+ results.clear();
+ sumVal = ci.add(sumVal, tempVal);
+ sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
+ rowCountVal++;
+ } while (hasMoreRows);
+ if (sumVal != null) {
+ ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
+ ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
+ AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+ pair.addFirstPart(first_sumVal);
+ pair.addFirstPart(first_sumSqVal);
+ ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+ bb.rewind();
+ pair.setSecondPart(ByteString.copyFrom(bb));
+ response = pair.build();
+ }
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {}
+ }
+ }
+ done.run(response);
+ }
+
+ /**
+ * Gives a List containing sum of values and sum of weights.
+ * It is computed for the combination of column
+ * family and column qualifier(s) in the given row range as defined in the
+ * Scan object. In its current implementation, it takes one column family and
+ * two column qualifiers. The first qualifier is for values column and
+ * the second qualifier (optional) is for weight column.
+ */
+ @Override
+ public void getMedian(RpcController controller, AggregateRequest request,
+ RpcCallback<AggregateResponse> done) {
+ AggregateResponse response = null;
+ InternalScanner scanner = null;
+ try {
+ ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
+ S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
+ Scan scan = ProtobufUtil.toScan(request.getScan());
+ scanner = env.getRegion().getScanner(scan);
+ byte[] colFamily = scan.getFamilies()[0];
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
+ byte[] valQualifier = null, weightQualifier = null;
+ if (qualifiers != null && !qualifiers.isEmpty()) {
+ valQualifier = qualifiers.pollFirst();
+ // if weighted median is requested, get qualifier for the weight column
+ weightQualifier = qualifiers.pollLast();
+ }
+ List<Cell> results = new ArrayList<Cell>();
+
+ boolean hasMoreRows = false;
+
+ do {
+ tempVal = null;
+ tempWeight = null;
+ hasMoreRows = scanner.next(results);
+ int listSize = results.size();
+ for (int i = 0; i < listSize; i++) {
+ Cell kv = results.get(i);
+ tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
+ valQualifier, kv)));
+ if (weightQualifier != null) {
+ tempWeight = ci.add(tempWeight,
+ ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
+ }
+ }
+ results.clear();
+ sumVal = ci.add(sumVal, tempVal);
+ sumWeights = ci.add(sumWeights, tempWeight);
+ } while (hasMoreRows);
+ ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
+ S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
+ ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
+ AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+ pair.addFirstPart(first_sumVal);
+ pair.addFirstPart(first_sumWeights);
+ response = pair.build();
+ } catch (IOException e) {
+ CoprocessorRpcUtils.setControllerException(controller, e);
+ } finally {
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {}
+ }
+ }
+ done.run(response);
+ }
+
+ @SuppressWarnings("unchecked")
+ // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
+ ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
+ AggregateRequest request) throws IOException {
+ String className = request.getInterpreterClassName();
+ Class<?> cls;
+ try {
+ cls = Class.forName(className);
+ ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
+ if (request.hasInterpreterSpecificBytes()) {
+ ByteString b = request.getInterpreterSpecificBytes();
+ P initMsg = AggregationClient.getParsedGenericInstance(ci.getClass(), 2, b);
+ ci.initialize(initMsg);
+ }
+ return ci;
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ /**
+ * Stores a reference to the coprocessor environment provided by the
+ * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
+ * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
+ * on a table region, so always expects this to be an instance of
+ * {@link RegionCoprocessorEnvironment}.
+ * @param env the environment provided by the coprocessor host
+ * @throws IOException if the provided environment is not an instance of
+ * {@code RegionCoprocessorEnvironment}
+ */
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment)env;
+ } else {
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // nothing to do
+ }
+
+}