You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/02 06:17:04 UTC
hbase git commit: HBASE-18972 Use Builder pattern to remove nullable
parameters for coprocessor methods in RawAsyncTable interface
Repository: hbase
Updated Branches:
refs/heads/master 49abc2e1c -> fad7d01d8
HBASE-18972 Use Builder pattern to remove nullable parameters for coprocessor methods in RawAsyncTable interface
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fad7d01d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fad7d01d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fad7d01d
Branch: refs/heads/master
Commit: fad7d01d8f28f2de8e82b27306568c265bd32b41
Parents: 49abc2e
Author: zhangduo <zh...@apache.org>
Authored: Thu Nov 2 13:55:16 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Nov 2 13:58:29 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/RawAsyncTable.java | 85 ++++++++++++--------
.../hadoop/hbase/client/RawAsyncTableImpl.java | 75 +++++++++++++----
.../coprocessor/AsyncAggregationClient.java | 72 ++++++++++-------
3 files changed, 151 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fad7d01d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
index cd0226b..102f279 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -38,11 +38,6 @@ import com.google.protobuf.RpcController;
* <p>
* So, only experts that want to build high performance service should use this interface directly,
* especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
- * <p>
- * TODO: For now the only difference between this interface and {@link AsyncTable} is the scan
- * method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat)
- * so it is not suitable for a normal user. If it is still the only difference after we implement
- * most features of AsyncTable, we can think about merge these two interfaces.
* @since 2.0.0
*/
@InterfaceAudience.Public
@@ -135,8 +130,8 @@ public interface RawAsyncTable extends AsyncTableBase {
* the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
* passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
* or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
- * {@link #onRegionComplete(RegionInfo, Object)} or
- * {@link #onRegionError(RegionInfo, Throwable)} calls in the future.
+ * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
+ * calls in the future.
* <p>
* Here is a pseudo code to describe a typical implementation of a range coprocessor service
* method to help you better understand how the {@link CoprocessorCallback} will be called. The
@@ -200,25 +195,56 @@ public interface RawAsyncTable extends AsyncTableBase {
}
/**
- * Execute the given coprocessor call on the regions which are covered by the range from
- * {@code startKey} inclusive and {@code endKey} exclusive. See the comment of
- * {@link #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, CoprocessorCallback)}
- * for more details.
- * @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean,
- * CoprocessorCallback)
+ * Helper class for sending coprocessorService request that executes a coprocessor call on regions
+ * which are covered by a range.
+ * <p>
+ * If {@code fromRow} is not specified the selection will start with the first table region. If
+ * {@code toRow} is not specified the selection will continue through the last table region.
+ * @param <S> the type of the protobuf Service you want to call.
+ * @param <R> the type of the return value.
*/
- default <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, byte[] startKey, byte[] endKey,
- CoprocessorCallback<R> callback) {
- coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback);
+ interface CoprocessorServiceBuilder<S, R> {
+
+ /**
+ * @param startKey start region selection with region containing this row, inclusive.
+ */
+ default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
+ return fromRow(startKey, true);
+ }
+
+ /**
+ * @param startKey start region selection with region containing this row
+ * @param inclusive whether to include the startKey
+ */
+ CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
+
+ /**
+ * @param endKey select regions up to and including the region containing this row, exclusive.
+ */
+ default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
+ return toRow(endKey, false);
+ }
+
+ /**
+ * @param endKey select regions up to and including the region containing this row
+ * @param inclusive whether to include the endKey
+ */
+ CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
+
+ /**
+ * Execute the coprocessorService request. You can get the response through the
+ * {@link CoprocessorCallback}.
+ */
+ void execute();
}
/**
- * Execute the given coprocessor call on the regions which are covered by the range from
- * {@code startKey} and {@code endKey}. The inclusive of boundaries are specified by
- * {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code stubMaker} is just a
- * delegation to the {@code xxxService.newStub} call. Usually it is only a one line lambda
- * expression, like:
+ * Execute a coprocessor call on the regions which are covered by a range.
+ * <p>
+ * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
+ * <p>
+ * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
+ * is only a one line lambda expression, like:
*
* <pre>
* <code>
@@ -229,20 +255,9 @@ public interface RawAsyncTable extends AsyncTableBase {
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link CoprocessorCallable} for more details.
- * @param startKey start region selection with region containing this row. If {@code null}, the
- * selection will start with the first table region.
- * @param startKeyInclusive whether to include the startKey
- * @param endKey select regions up to and including the region containing this row. If
- * {@code null}, selection will continue through the last table region.
- * @param endKeyInclusive whether to include the endKey
* @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
* for more details.
- * @param <S> the type of the asynchronous stub
- * @param <R> the type of the return value
- * @see CoprocessorCallable
- * @see CoprocessorCallback
*/
- <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
- boolean endKeyInclusive, CoprocessorCallback<R> callback);
+ <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fad7d01d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 6107f7f..d4de573 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
-import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
@@ -29,7 +27,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,6 +35,7 @@ import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
@@ -560,19 +558,64 @@ class RawAsyncTableImpl implements RawAsyncTable {
});
}
- @Override
- public <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
- boolean endKeyInclusive, CoprocessorCallback<R> callback) {
- byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW);
- byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW);
- List<HRegionLocation> locs = new ArrayList<>();
- conn.getLocator()
- .getRegionLocation(tableName, nonNullStartKey,
- startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
- .whenComplete(
- (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey,
- endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+ private final class CoprocessorServiceBuilderImpl<S, R>
+ implements CoprocessorServiceBuilder<S, R> {
+
+ private final Function<RpcChannel, S> stubMaker;
+
+ private final CoprocessorCallable<S, R> callable;
+
+ private final CoprocessorCallback<R> callback;
+
+ private byte[] startKey = HConstants.EMPTY_START_ROW;
+
+ private boolean startKeyInclusive;
+
+ private byte[] endKey = HConstants.EMPTY_END_ROW;
+
+ private boolean endKeyInclusive;
+
+ public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
+ CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback) {
+ this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
+ this.callable = Preconditions.checkNotNull(callable, "callable is null");
+ this.callback = Preconditions.checkNotNull(callback, "callback is null");
+ }
+
+ @Override
+ public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
+ this.startKey = Preconditions.checkNotNull(startKey,
+ "startKey is null. Consider using" +
+ " an empty byte array, or just do not call this method if you want to start selection" +
+ " from the first region");
+ this.startKeyInclusive = inclusive;
+ return this;
+ }
+
+ @Override
+ public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
+ this.endKey = Preconditions.checkNotNull(endKey,
+ "endKey is null. Consider using" +
+ " an empty byte array, or just do not call this method if you want to continue" +
+ " selection to the last region");
+ this.endKeyInclusive = inclusive;
+ return this;
+ }
+
+ @Override
+ public void execute() {
+ conn.getLocator().getRegionLocation(tableName, startKey,
+ startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
+ .whenComplete(
+ (loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
+ endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
+ }
}
+ @Override
+ public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
+ Function<RpcChannel, S> stubMaker, CoprocessorCallable<S, R> callable,
+ CoprocessorCallback<R> callback) {
+ return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fad7d01d/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
index 51c8248..ff9b873 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
+import com.google.protobuf.Message;
+
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
@@ -29,6 +31,7 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RawAsyncTable;
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
import org.apache.hadoop.hbase.client.RawScanResultConsumer;
@@ -43,8 +46,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
-import com.google.protobuf.Message;
-
/**
* This client class is for invoking the aggregate functions deployed on the Region Server side via
* the AggregateService. This class will implement the supporting functionality for
@@ -120,6 +121,10 @@ public class AsyncAggregationClient {
return ci.getPromotedValueFromProto(t);
}
+ private static byte[] nullToEmpty(byte[] b) {
+ return b != null ? b : HConstants.EMPTY_BYTE_ARRAY;
+ }
+
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
@@ -149,10 +154,11 @@ public class AsyncAggregationClient {
return max;
}
};
- table.coprocessorService(channel -> AggregateService.newStub(channel),
- (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback),
- scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
- callback);
+ table
+ .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
+ (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), callback)
+ .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+ .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
return future;
}
@@ -185,10 +191,11 @@ public class AsyncAggregationClient {
return min;
}
};
- table.coprocessorService(channel -> AggregateService.newStub(channel),
- (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback),
- scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
- callback);
+ table
+ .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
+ (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), callback)
+ .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+ .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
return future;
}
@@ -217,10 +224,11 @@ public class AsyncAggregationClient {
return count;
}
};
- table.coprocessorService(channel -> AggregateService.newStub(channel),
- (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback),
- scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
- callback);
+ table
+ .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
+ (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), callback)
+ .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+ .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
return future;
}
@@ -251,10 +259,11 @@ public class AsyncAggregationClient {
return sum;
}
};
- table.coprocessorService(channel -> AggregateService.newStub(channel),
- (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback),
- scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
- callback);
+ table
+ .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
+ (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), callback)
+ .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+ .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
return future;
}
@@ -288,10 +297,11 @@ public class AsyncAggregationClient {
return ci.divideForAvg(sum, count);
}
};
- table.coprocessorService(channel -> AggregateService.newStub(channel),
- (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback),
- scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
- callback);
+ table
+ .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
+ (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), callback)
+ .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+ .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
return future;
}
@@ -330,10 +340,11 @@ public class AsyncAggregationClient {
return Math.sqrt(avgSq - avg * avg);
}
};
- table.coprocessorService(channel -> AggregateService.newStub(channel),
- (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback),
- scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
- callback);
+ table
+ .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
+ (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), callback)
+ .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+ .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
return future;
}
@@ -368,10 +379,11 @@ public class AsyncAggregationClient {
return map;
}
};
- table.coprocessorService(channel -> AggregateService.newStub(channel),
- (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback),
- scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
- callback);
+ table
+ .<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
+ (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback)
+ .fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
+ .toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
return future;
}