You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/16 06:37:07 UTC
[2/3] hbase git commit: HBASE-19251 Merge RawAsyncTable and AsyncTable
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
deleted file mode 100644
index 7d24c4f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static java.util.stream.Collectors.toList;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
-
-import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * The base interface for asynchronous version of Table. Obtain an instance from a
- * {@link AsyncConnection}.
- * <p>
- * The implementation is required to be thread safe.
- * <p>
- * Usually the implementation will not throw any exception directly. You need to get the exception
- * from the returned {@link CompletableFuture}.
- * @since 2.0.0
- */
-@InterfaceAudience.Public
-public interface AsyncTableBase {
-
- /**
- * Gets the fully qualified table name instance of this table.
- */
- TableName getName();
-
- /**
- * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
- * <p>
- * The reference returned is not a copy, so any change made to it will affect this instance.
- */
- Configuration getConfiguration();
-
- /**
- * Get timeout of each rpc request in this Table instance. It will be overridden by a more
- * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
- * @see #getReadRpcTimeout(TimeUnit)
- * @see #getWriteRpcTimeout(TimeUnit)
- * @param unit the unit of time the timeout to be represented in
- * @return rpc timeout in the specified time unit
- */
- long getRpcTimeout(TimeUnit unit);
-
- /**
- * Get timeout of each rpc read request in this Table instance.
- * @param unit the unit of time the timeout to be represented in
- * @return read rpc timeout in the specified time unit
- */
- long getReadRpcTimeout(TimeUnit unit);
-
- /**
- * Get timeout of each rpc write request in this Table instance.
- * @param unit the unit of time the timeout to be represented in
- * @return write rpc timeout in the specified time unit
- */
- long getWriteRpcTimeout(TimeUnit unit);
-
- /**
- * Get timeout of each operation in Table instance.
- * @param unit the unit of time the timeout to be represented in
- * @return operation rpc timeout in the specified time unit
- */
- long getOperationTimeout(TimeUnit unit);
-
- /**
- * Get the timeout of a single operation in a scan. It works like operation timeout for other
- * operations.
- * @param unit the unit of time the timeout to be represented in
- * @return scan rpc timeout in the specified time unit
- */
- long getScanTimeout(TimeUnit unit);
-
- /**
- * Test for the existence of columns in the table, as specified by the Get.
- * <p>
- * This will return true if the Get matches one or more keys, false if not.
- * <p>
- * This is a server-side call so it prevents any data from being transfered to the client.
- * @return true if the specified Get matches one or more keys, false if not. The return value will
- * be wrapped by a {@link CompletableFuture}.
- */
- default CompletableFuture<Boolean> exists(Get get) {
- return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists());
- }
-
- /**
- * Extracts certain cells from a given row.
- * @param get The object that specifies what data to fetch and from which row.
- * @return The data coming from the specified row, if it exists. If the row specified doesn't
- * exist, the {@link Result} instance returned won't contain any
- * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The
- * return value will be wrapped by a {@link CompletableFuture}.
- */
- CompletableFuture<Result> get(Get get);
-
- /**
- * Puts some data to the table.
- * @param put The data to put.
- * @return A {@link CompletableFuture} that always returns null when complete normally.
- */
- CompletableFuture<Void> put(Put put);
-
- /**
- * Deletes the specified cells/row.
- * @param delete The object that specifies what to delete.
- * @return A {@link CompletableFuture} that always returns null when complete normally.
- */
- CompletableFuture<Void> delete(Delete delete);
-
- /**
- * Appends values to one or more columns within a single row.
- * <p>
- * This operation does not appear atomic to readers. Appends are done under a single row lock, so
- * write operations to a row are synchronized, but readers do not take row locks so get and scan
- * operations can see this operation partially completed.
- * @param append object that specifies the columns and amounts to be used for the increment
- * operations
- * @return values of columns after the append operation (maybe null). The return value will be
- * wrapped by a {@link CompletableFuture}.
- */
- CompletableFuture<Result> append(Append append);
-
- /**
- * Increments one or more columns within a single row.
- * <p>
- * This operation does not appear atomic to readers. Increments are done under a single row lock,
- * so write operations to a row are synchronized, but readers do not take row locks so get and
- * scan operations can see this operation partially completed.
- * @param increment object that specifies the columns and amounts to be used for the increment
- * operations
- * @return values of columns after the increment. The return value will be wrapped by a
- * {@link CompletableFuture}.
- */
- CompletableFuture<Result> increment(Increment increment);
-
- /**
- * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)}
- * <p>
- * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
- * @param row The row that contains the cell to increment.
- * @param family The column family of the cell to increment.
- * @param qualifier The column qualifier of the cell to increment.
- * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
- * @return The new value, post increment. The return value will be wrapped by a
- * {@link CompletableFuture}.
- */
- default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
- long amount) {
- return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
- }
-
- /**
- * Atomically increments a column value. If the column value already exists and is not a
- * big-endian long, this could throw an exception. If the column value does not yet exist it is
- * initialized to <code>amount</code> and written to the specified column.
- * <p>
- * Setting durability to {@link Durability#SKIP_WAL} means that in a fail scenario you will lose
- * any increments that have not been flushed.
- * @param row The row that contains the cell to increment.
- * @param family The column family of the cell to increment.
- * @param qualifier The column qualifier of the cell to increment.
- * @param amount The amount to increment the cell with (or decrement, if the amount is negative).
- * @param durability The persistence guarantee for this increment.
- * @return The new value, post increment. The return value will be wrapped by a
- * {@link CompletableFuture}.
- */
- default CompletableFuture<Long> incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
- long amount, Durability durability) {
- Preconditions.checkNotNull(row, "row is null");
- Preconditions.checkNotNull(family, "family is null");
- return increment(
- new Increment(row).addColumn(family, qualifier, amount).setDurability(durability))
- .thenApply(r -> Bytes.toLong(r.getValue(family, qualifier)));
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
- * adds the Put/Delete/RowMutations.
- * <p>
- * Use the returned {@link CheckAndMutateBuilder} to construct your request and then execute it.
- * This is a fluent style API, the code is like:
- *
- * <pre>
- * <code>
- * table.checkAndMutate(row, family).qualifier(qualifier).ifNotExists().thenPut(put)
- * .thenAccept(succ -> {
- * if (succ) {
- * System.out.println("Check and put succeeded");
- * } else {
- * System.out.println("Check and put failed");
- * }
- * });
- * </code>
- * </pre>
- */
- CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family);
-
- /**
- * A helper class for sending checkAndMutate request.
- */
- interface CheckAndMutateBuilder {
-
- /**
- * @param qualifier column qualifier to check.
- */
- CheckAndMutateBuilder qualifier(byte[] qualifier);
-
- /**
- * Check for lack of column.
- */
- CheckAndMutateBuilder ifNotExists();
-
- default CheckAndMutateBuilder ifEquals(byte[] value) {
- return ifMatches(CompareOperator.EQUAL, value);
- }
-
- /**
- * @param compareOp comparison operator to use
- * @param value the expected value
- */
- CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value);
-
- /**
- * @param put data to put if check succeeds
- * @return {@code true} if the new put was executed, {@code false} otherwise. The return value
- * will be wrapped by a {@link CompletableFuture}.
- */
- CompletableFuture<Boolean> thenPut(Put put);
-
- /**
- * @param delete data to delete if check succeeds
- * @return {@code true} if the new delete was executed, {@code false} otherwise. The return
- * value will be wrapped by a {@link CompletableFuture}.
- */
- CompletableFuture<Boolean> thenDelete(Delete delete);
-
- /**
- * @param mutation mutations to perform if check succeeds
- * @return true if the new mutation was executed, false otherwise. The return value will be
- * wrapped by a {@link CompletableFuture}.
- */
- CompletableFuture<Boolean> thenMutate(RowMutations mutation);
- }
-
- /**
- * Performs multiple mutations atomically on a single row. Currently {@link Put} and
- * {@link Delete} are supported.
- * @param mutation object that specifies the set of mutations to perform atomically
- * @return A {@link CompletableFuture} that always returns null when complete normally.
- */
- CompletableFuture<Void> mutateRow(RowMutations mutation);
-
- /**
- * Return all the results that match the given scan object.
- * <p>
- * Notice that usually you should use this method with a {@link Scan} object that has limit set.
- * For example, if you want to get the closest row after a given row, you could do this:
- * <p>
- *
- * <pre>
- * <code>
- * table.scanAll(new Scan().withStartRow(row, false).setLimit(1)).thenAccept(results -> {
- * if (results.isEmpty()) {
- * System.out.println("No row after " + Bytes.toStringBinary(row));
- * } else {
- * System.out.println("The closest row after " + Bytes.toStringBinary(row) + " is "
- * + Bytes.toStringBinary(results.stream().findFirst().get().getRow()));
- * }
- * });
- * </code>
- * </pre>
- * <p>
- * If your result set is very large, you should use other scan method to get a scanner or use
- * callback to process the results. They will do chunking to prevent OOM. The scanAll method will
- * fetch all the results and store them in a List and then return the list to you.
- * <p>
- * The scan metrics will be collected background if you enable it but you have no way to get it.
- * Usually you can get scan metrics from {@code ResultScanner}, or through
- * {@code ScanResultConsumer.onScanMetricsCreated} but this method only returns a list of results.
- * So if you really care about scan metrics then you'd better use other scan methods which return
- * a {@code ResultScanner} or let you pass in a {@code ScanResultConsumer}. There is no
- * performance difference between these scan methods so do not worry.
- * @param scan A configured {@link Scan} object. So if you use this method to fetch a really large
- * result set, it is likely to cause OOM.
- * @return The results of this small scan operation. The return value will be wrapped by a
- * {@link CompletableFuture}.
- */
- CompletableFuture<List<Result>> scanAll(Scan scan);
-
- /**
- * Test for the existence of columns in the table, as specified by the Gets.
- * <p>
- * This will return a list of booleans. Each value will be true if the related Get matches one or
- * more keys, false if not.
- * <p>
- * This is a server-side call so it prevents any data from being transferred to the client.
- * @param gets the Gets
- * @return A list of {@link CompletableFuture}s that represent the existence for each get.
- */
- default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
- return get(toCheckExistenceOnly(gets)).stream()
- .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
- }
-
- /**
- * A simple version for batch exists. It will fail if there are any failures and you will get the
- * whole result boolean list at once if the operation is succeeded.
- * @param gets the Gets
- * @return A {@link CompletableFuture} that wrapper the result boolean list.
- */
- default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
- return allOf(exists(gets));
- }
-
- /**
- * Extracts certain cells from the given rows, in batch.
- * <p>
- * Notice that you may not get all the results with this function, which means some of the
- * returned {@link CompletableFuture}s may succeed while some of the other returned
- * {@link CompletableFuture}s may fail.
- * @param gets The objects that specify what data to fetch and from which rows.
- * @return A list of {@link CompletableFuture}s that represent the result for each get.
- */
- List<CompletableFuture<Result>> get(List<Get> gets);
-
- /**
- * A simple version for batch get. It will fail if there are any failures and you will get the
- * whole result list at once if the operation is succeeded.
- * @param gets The objects that specify what data to fetch and from which rows.
- * @return A {@link CompletableFuture} that wrapper the result list.
- */
- default CompletableFuture<List<Result>> getAll(List<Get> gets) {
- return allOf(get(gets));
- }
-
- /**
- * Puts some data in the table, in batch.
- * @param puts The list of mutations to apply.
- * @return A list of {@link CompletableFuture}s that represent the result for each put.
- */
- List<CompletableFuture<Void>> put(List<Put> puts);
-
- /**
- * A simple version of batch put. It will fail if there are any failures.
- * @param puts The list of mutations to apply.
- * @return A {@link CompletableFuture} that always returns null when complete normally.
- */
- default CompletableFuture<Void> putAll(List<Put> puts) {
- return allOf(put(puts)).thenApply(r -> null);
- }
-
- /**
- * Deletes the specified cells/rows in bulk.
- * @param deletes list of things to delete.
- * @return A list of {@link CompletableFuture}s that represent the result for each delete.
- */
- List<CompletableFuture<Void>> delete(List<Delete> deletes);
-
- /**
- * A simple version of batch delete. It will fail if there are any failures.
- * @param deletes list of things to delete.
- * @return A {@link CompletableFuture} that always returns null when complete normally.
- */
- default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
- return allOf(delete(deletes)).thenApply(r -> null);
- }
-
- /**
- * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of
- * execution of the actions is not defined. Meaning if you do a Put and a Get in the same
- * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put
- * had put.
- * @param actions list of Get, Put, Delete, Increment, Append objects
- * @return A list of {@link CompletableFuture}s that represent the result for each action.
- */
- <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
-
- /**
- * A simple version of batch. It will fail if there are any failures and you will get the whole
- * result list at once if the operation is succeeded.
- * @param actions list of Get, Put, Delete, Increment, Append objects
- * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
- */
- default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
- return allOf(batch(actions));
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
index 9c5b092..6632ad5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * For creating {@link AsyncTable} or {@link RawAsyncTable}.
+ * For creating {@link AsyncTable}.
* <p>
* The implementation should have default configurations set before returning the builder to user.
* So users are free to only set the configs they care about to create a new
@@ -32,7 +32,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* @since 2.0.0
*/
@InterfaceAudience.Public
-public interface AsyncTableBuilder<T extends AsyncTableBase> {
+public interface AsyncTableBuilder<C extends ScanResultConsumerBase> {
/**
* Set timeout for a whole operation such as get, put or delete. Notice that scan will not be
@@ -44,7 +44,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
* @see #setMaxRetries(int)
* @see #setScanTimeout(long, TimeUnit)
*/
- AsyncTableBuilder<T> setOperationTimeout(long timeout, TimeUnit unit);
+ AsyncTableBuilder<C> setOperationTimeout(long timeout, TimeUnit unit);
/**
* As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
@@ -53,7 +53,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
* operation in a scan, such as openScanner or next.
* @see #setScanTimeout(long, TimeUnit)
*/
- AsyncTableBuilder<T> setScanTimeout(long timeout, TimeUnit unit);
+ AsyncTableBuilder<C> setScanTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each rpc request.
@@ -61,23 +61,23 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
* Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request
* and write request(put, delete).
*/
- AsyncTableBuilder<T> setRpcTimeout(long timeout, TimeUnit unit);
+ AsyncTableBuilder<C> setRpcTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each read(get, scan) rpc request.
*/
- AsyncTableBuilder<T> setReadRpcTimeout(long timeout, TimeUnit unit);
+ AsyncTableBuilder<C> setReadRpcTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each write(put, delete) rpc request.
*/
- AsyncTableBuilder<T> setWriteRpcTimeout(long timeout, TimeUnit unit);
+ AsyncTableBuilder<C> setWriteRpcTimeout(long timeout, TimeUnit unit);
/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
*/
- AsyncTableBuilder<T> setRetryPause(long pause, TimeUnit unit);
+ AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit);
/**
* Set the max retry times for an operation. Usually it is the max attempt times minus 1.
@@ -87,7 +87,7 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
* @see #setMaxAttempts(int)
* @see #setOperationTimeout(long, TimeUnit)
*/
- default AsyncTableBuilder<T> setMaxRetries(int maxRetries) {
+ default AsyncTableBuilder<C> setMaxRetries(int maxRetries) {
return setMaxAttempts(retries2Attempts(maxRetries));
}
@@ -98,15 +98,15 @@ public interface AsyncTableBuilder<T extends AsyncTableBase> {
* @see #setMaxRetries(int)
* @see #setOperationTimeout(long, TimeUnit)
*/
- AsyncTableBuilder<T> setMaxAttempts(int maxAttempts);
+ AsyncTableBuilder<C> setMaxAttempts(int maxAttempts);
/**
* Set the number of retries that are allowed before we start to log.
*/
- AsyncTableBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt);
+ AsyncTableBuilder<C> setStartLogErrorsCnt(int startLogErrorsCnt);
/**
- * Create the {@link AsyncTable} or {@link RawAsyncTable} instance.
+ * Create the {@link AsyncTable} instance.
*/
- T build();
+ AsyncTable<C> build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
index 3fd6bde..ee571f1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
* Base class for all asynchronous table builders.
*/
@InterfaceAudience.Private
-abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncTableBuilder<T> {
+abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>
+ implements AsyncTableBuilder<C> {
protected TableName tableName;
@@ -51,7 +52,7 @@ abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncT
AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) {
this.tableName = tableName;
this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs()
- : connConf.getOperationTimeoutNs();
+ : connConf.getOperationTimeoutNs();
this.scanTimeoutNs = connConf.getScanTimeoutNs();
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
@@ -62,49 +63,49 @@ abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncT
}
@Override
- public AsyncTableBuilderBase<T> setOperationTimeout(long timeout, TimeUnit unit) {
+ public AsyncTableBuilderBase<C> setOperationTimeout(long timeout, TimeUnit unit) {
this.operationTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
- public AsyncTableBuilderBase<T> setScanTimeout(long timeout, TimeUnit unit) {
+ public AsyncTableBuilderBase<C> setScanTimeout(long timeout, TimeUnit unit) {
this.scanTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
- public AsyncTableBuilderBase<T> setRpcTimeout(long timeout, TimeUnit unit) {
+ public AsyncTableBuilderBase<C> setRpcTimeout(long timeout, TimeUnit unit) {
this.rpcTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
- public AsyncTableBuilderBase<T> setReadRpcTimeout(long timeout, TimeUnit unit) {
+ public AsyncTableBuilderBase<C> setReadRpcTimeout(long timeout, TimeUnit unit) {
this.readRpcTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
- public AsyncTableBuilderBase<T> setWriteRpcTimeout(long timeout, TimeUnit unit) {
+ public AsyncTableBuilderBase<C> setWriteRpcTimeout(long timeout, TimeUnit unit) {
this.writeRpcTimeoutNs = unit.toNanos(timeout);
return this;
}
@Override
- public AsyncTableBuilderBase<T> setRetryPause(long pause, TimeUnit unit) {
+ public AsyncTableBuilderBase<C> setRetryPause(long pause, TimeUnit unit) {
this.pauseNs = unit.toNanos(pause);
return this;
}
@Override
- public AsyncTableBuilderBase<T> setMaxAttempts(int maxAttempts) {
+ public AsyncTableBuilderBase<C> setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
}
@Override
- public AsyncTableBuilderBase<T> setStartLogErrorsCnt(int startLogErrorsCnt) {
+ public AsyncTableBuilderBase<C> setStartLogErrorsCnt(int startLogErrorsCnt) {
this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index ae43f5b..c8553c6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -19,34 +19,37 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
+import com.google.protobuf.RpcChannel;
+
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * The implementation of AsyncTable. Based on {@link RawAsyncTable}.
+ * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
+ * thread pool when constructing this class, and the callback methods registered to the returned
+ * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
+ * to do anything they want in the callbacks without breaking the rpc framework.
*/
@InterfaceAudience.Private
-class AsyncTableImpl implements AsyncTable {
+class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
- private final RawAsyncTable rawTable;
+ private final AsyncTable<AdvancedScanResultConsumer> rawTable;
private final ExecutorService pool;
- private final long defaultScannerMaxResultSize;
-
- AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) {
+ AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable,
+ ExecutorService pool) {
this.rawTable = rawTable;
this.pool = pool;
- this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
}
@Override
@@ -172,16 +175,9 @@ class AsyncTableImpl implements AsyncTable {
return wrap(rawTable.scanAll(scan));
}
- private long resultSize2CacheSize(long maxResultSize) {
- // * 2 if possible
- return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
- }
-
@Override
public ResultScanner getScanner(Scan scan) {
- return new AsyncTableResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan),
- resultSize2CacheSize(
- scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+ return rawTable.getScanner(scan);
}
private void scan0(Scan scan, ScanResultConsumer consumer) {
@@ -222,4 +218,59 @@ class AsyncTableImpl implements AsyncTable {
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
}
+
+ @Override
+ public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
+ ServiceCaller<S, R> callable, byte[] row) {
+ return wrap(rawTable.coprocessorService(stubMaker, callable, row));
+ }
+
+ @Override
+ public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
+ Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
+ CoprocessorCallback<R> callback) {
+ CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
+
+ @Override
+ public void onRegionComplete(RegionInfo region, R resp) {
+ pool.execute(() -> callback.onRegionComplete(region, resp));
+ }
+
+ @Override
+ public void onRegionError(RegionInfo region, Throwable error) {
+ pool.execute(() -> callback.onRegionError(region, error));
+ }
+
+ @Override
+ public void onComplete() {
+ pool.execute(() -> callback.onComplete());
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ pool.execute(() -> callback.onError(error));
+ }
+ };
+ CoprocessorServiceBuilder<S, R> builder =
+ rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
+ return new CoprocessorServiceBuilder<S, R>() {
+
+ @Override
+ public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
+ builder.fromRow(startKey, inclusive);
+ return this;
+ }
+
+ @Override
+ public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
+ builder.toRow(endKey, inclusive);
+ return this;
+ }
+
+ @Override
+ public void execute() {
+ builder.execute();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 957f06f..fe9645a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
* {@code 2 * scan.getMaxResultSize()}.
*/
@InterfaceAudience.Private
-class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
+class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer {
private static final Log LOG = LogFactory.getLog(AsyncTableResultScanner.class);
- private final RawAsyncTable rawTable;
+ private final AsyncTable<AdvancedScanResultConsumer> rawTable;
private final long maxCacheSize;
@@ -59,7 +59,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private ScanResumer resumer;
- public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) {
+ public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan,
+ long maxCacheSize) {
this.rawTable = table;
this.maxCacheSize = maxCacheSize;
this.scan = scan;
@@ -74,8 +75,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
private void stopPrefetch(ScanController controller) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
- " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
- cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
+ " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
+ cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
}
resumer = controller.suspend();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index bc0ade2..780dcf9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -21,9 +21,6 @@ import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
@@ -41,26 +38,28 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.DNS;
/**
* Utility used by client connections.
@@ -378,7 +377,7 @@ public final class ConnectionUtils {
}
}
- static boolean noMoreResultsForScan(Scan scan, HRegionInfo info) {
+ static boolean noMoreResultsForScan(Scan scan, RegionInfo info) {
if (isEmptyStopRow(info.getEndKey())) {
return true;
}
@@ -392,7 +391,7 @@ public final class ConnectionUtils {
return c > 0 || (c == 0 && !scan.includeStopRow());
}
- static boolean noMoreResultsForReverseScan(Scan scan, HRegionInfo info) {
+ static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) {
if (isEmptyStartRow(info.getStartKey())) {
return true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index bcf581b..6366cf0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -63,7 +68,6 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
-import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.client.replication.TableCFs;
@@ -83,6 +87,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout;
@@ -245,11 +250,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
-
/**
* The implementation of AsyncAdmin.
* <p>
@@ -263,7 +263,7 @@ import com.google.protobuf.RpcChannel;
* @see AsyncConnection#getAdminBuilder()
*/
@InterfaceAudience.Private
-public class RawAsyncHBaseAdmin implements AsyncAdmin {
+class RawAsyncHBaseAdmin implements AsyncAdmin {
public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
@@ -272,7 +272,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
private final HashedWheelTimer retryTimer;
- private final RawAsyncTable metaTable;
+ private final AsyncTable<AdvancedScanResultConsumer> metaTable;
private final long rpcTimeoutNs;
@@ -290,7 +290,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
AsyncAdminBuilderBase builder) {
this.connection = connection;
this.retryTimer = retryTimer;
- this.metaTable = connection.getRawTable(META_TABLE_NAME);
+ this.metaTable = connection.getTable(META_TABLE_NAME);
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.operationTimeoutNs = builder.operationTimeoutNs;
this.pauseNs = builder.pauseNs;
@@ -1442,8 +1442,8 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<List<QuotaSettings>> getQuota(QuotaFilter filter) {
CompletableFuture<List<QuotaSettings>> future = new CompletableFuture<>();
Scan scan = QuotaTableUtil.makeScan(filter);
- this.connection.getRawTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
- .scan(scan, new RawScanResultConsumer() {
+ this.connection.getTableBuilder(QuotaTableUtil.QUOTA_TABLE_NAME).build()
+ .scan(scan, new AdvancedScanResultConsumer() {
List<QuotaSettings> settings = new ArrayList<>();
@Override
@@ -3001,7 +3001,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable) {
+ ServiceCaller<S, R> callable) {
MasterCoprocessorRpcChannelImpl channel =
new MasterCoprocessorRpcChannelImpl(this.<Message> newMasterCaller());
S stub = stubMaker.apply(channel);
@@ -3019,7 +3019,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, ServerName serverName) {
+ ServiceCaller<S, R> callable, ServerName serverName) {
RegionServerCoprocessorRpcChannelImpl channel =
new RegionServerCoprocessorRpcChannelImpl(this.<Message> newServerCaller().serverName(
serverName));
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
deleted file mode 100644
index 102f279..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-
-/**
- * A low level asynchronous table.
- * <p>
- * The implementation is required to be thread safe.
- * <p>
- * The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback
- * thread, so typically you should not do any time consuming work inside these methods, otherwise
- * you will be likely to block at least one connection to RS(even more if the rpc framework uses
- * NIO).
- * <p>
- * So, only experts that want to build high performance service should use this interface directly,
- * especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
- * @since 2.0.0
- */
-@InterfaceAudience.Public
-public interface RawAsyncTable extends AsyncTableBase {
-
- /**
- * The basic scan API uses the observer pattern. All results that match the given scan object will
- * be passed to the given {@code consumer} by calling {@code RawScanResultConsumer.onNext}.
- * {@code RawScanResultConsumer.onComplete} means the scan is finished, and
- * {@code RawScanResultConsumer.onError} means we hit an unrecoverable error and the scan is
- * terminated. {@code RawScanResultConsumer.onHeartbeat} means the RS is still working but we can
- * not get a valid result to call {@code RawScanResultConsumer.onNext}. This is usually because
- * the matched results are too sparse, for example, a filter which almost filters out everything
- * is specified.
- * <p>
- * Notice that, the methods of the given {@code consumer} will be called directly in the rpc
- * framework's callback thread, so typically you should not do any time consuming work inside
- * these methods, otherwise you will be likely to block at least one connection to RS(even more if
- * the rpc framework uses NIO).
- * @param scan A configured {@link Scan} object.
- * @param consumer the consumer used to receive results.
- */
- void scan(Scan scan, RawScanResultConsumer consumer);
-
- /**
- * Delegate to a protobuf rpc call.
- * <p>
- * Usually, it is just a simple lambda expression, like:
- *
- * <pre>
- * <code>
- * (stub, controller, rpcCallback) -> {
- * XXXRequest request = ...; // prepare the request
- * stub.xxx(controller, request, rpcCallback);
- * }
- * </code>
- * </pre>
- *
- * And if you can prepare the {@code request} before calling the coprocessorService method, the
- * lambda expression will be:
- *
- * <pre>
- * <code>
- * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback)
- * </code>
- * </pre>
- */
- @InterfaceAudience.Public
- @FunctionalInterface
- interface CoprocessorCallable<S, R> {
-
- /**
- * Represent the actual protobuf rpc call.
- * @param stub the asynchronous stub
- * @param controller the rpc controller, has already been prepared for you
- * @param rpcCallback the rpc callback, has already been prepared for you
- */
- void call(S stub, RpcController controller, RpcCallback<R> rpcCallback);
- }
-
- /**
- * Execute the given coprocessor call on the region which contains the given {@code row}.
- * <p>
- * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
- * one line lambda expression, like:
- *
- * <pre>
- * <code>
- * channel -> xxxService.newStub(channel)
- * </code>
- * </pre>
- *
- * @param stubMaker a delegation to the actual {@code newStub} call.
- * @param callable a delegation to the actual protobuf rpc call. See the comment of
- * {@link CoprocessorCallable} for more details.
- * @param row The row key used to identify the remote region location
- * @param <S> the type of the asynchronous stub
- * @param <R> the type of the return value
- * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
- * @see CoprocessorCallable
- */
- <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, byte[] row);
-
- /**
- * The callback when we want to execute a coprocessor call on a range of regions.
- * <p>
- * As the locating itself also takes some time, the implementation may want to send rpc calls on
- * the fly, which means we do not know how many regions we have when we get the return value of
- * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
- * passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
- * or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
- * {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
- * calls in the future.
- * <p>
- * Here is a pseudo code to describe a typical implementation of a range coprocessor service
- * method to help you better understand how the {@link CoprocessorCallback} will be called. The
- * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
- * {@code whenComplete} is {@code CompletableFuture.whenComplete}.
- *
- * <pre>
- * locateThenCall(byte[] row) {
- * locate(row).whenComplete((location, locateError) -> {
- * if (locateError != null) {
- * callback.onError(locateError);
- * return;
- * }
- * incPendingCall();
- * region = location.getRegion();
- * if (region.getEndKey() > endKey) {
- * locateEnd = true;
- * } else {
- * locateThenCall(region.getEndKey());
- * }
- * sendCall().whenComplete((resp, error) -> {
- * if (error != null) {
- * callback.onRegionError(region, error);
- * } else {
- * callback.onRegionComplete(region, resp);
- * }
- * if (locateEnd && decPendingCallAndGet() == 0) {
- * callback.onComplete();
- * }
- * });
- * });
- * }
- * </pre>
- */
- @InterfaceAudience.Public
- interface CoprocessorCallback<R> {
-
- /**
- * @param region the region that the response belongs to
- * @param resp the response of the coprocessor call
- */
- void onRegionComplete(RegionInfo region, R resp);
-
- /**
- * @param region the region that the error belongs to
- * @param error the response error of the coprocessor call
- */
- void onRegionError(RegionInfo region, Throwable error);
-
- /**
- * Indicate that all responses of the regions have been notified by calling
- * {@link #onRegionComplete(RegionInfo, Object)} or
- * {@link #onRegionError(RegionInfo, Throwable)}.
- */
- void onComplete();
-
- /**
- * Indicate that we got an error which does not belong to any regions. Usually a locating error.
- */
- void onError(Throwable error);
- }
-
- /**
- * Helper class for sending coprocessorService request that executes a coprocessor call on regions
- * which are covered by a range.
- * <p>
- * If {@code fromRow} is not specified the selection will start with the first table region. If
- * {@code toRow} is not specified the selection will continue through the last table region.
- * @param <S> the type of the protobuf Service you want to call.
- * @param <R> the type of the return value.
- */
- interface CoprocessorServiceBuilder<S, R> {
-
- /**
- * @param startKey start region selection with region containing this row, inclusive.
- */
- default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
- return fromRow(startKey, true);
- }
-
- /**
- * @param startKey start region selection with region containing this row
- * @param inclusive whether to include the startKey
- */
- CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
-
- /**
- * @param endKey select regions up to and including the region containing this row, exclusive.
- */
- default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
- return toRow(endKey, false);
- }
-
- /**
- * @param endKey select regions up to and including the region containing this row
- * @param inclusive whether to include the endKey
- */
- CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
-
- /**
- * Execute the coprocessorService request. You can get the response through the
- * {@link CoprocessorCallback}.
- */
- void execute();
- }
-
- /**
- * Execute a coprocessor call on the regions which are covered by a range.
- * <p>
- * Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
- * <p>
- * The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
- * is only a one line lambda expression, like:
- *
- * <pre>
- * <code>
- * channel -> xxxService.newStub(channel)
- * </code>
- * </pre>
- *
- * @param stubMaker a delegation to the actual {@code newStub} call.
- * @param callable a delegation to the actual protobuf rpc call. See the comment of
- * {@link CoprocessorCallable} for more details.
- * @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
- * for more details.
- */
- <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index d4de573..07a2b92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -62,9 +62,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
/**
* The implementation of RawAsyncTable.
+ * <p>
+ * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
+ * be finished inside the rpc framework thread, which means that the callbacks registered to the
+ * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
+ * this class should not try to do time consuming tasks in the callbacks.
+ * @since 2.0.0
+ * @see AsyncTableImpl
*/
@InterfaceAudience.Private
-class RawAsyncTableImpl implements RawAsyncTable {
+class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private final AsyncConnectionImpl conn;
@@ -102,7 +109,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
- : conn.connConf.getScannerCaching();
+ : conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
}
@@ -270,7 +277,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
this.qualifier = Preconditions.checkNotNull(qualifier, "qualifier is null. Consider using" +
- " an empty byte array, or just do not call this method if you want a null qualifier");
+ " an empty byte array, or just do not call this method if you want a null qualifier");
return this;
}
@@ -290,7 +297,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
private void preCheck() {
Preconditions.checkNotNull(op, "condition is null. You need to specify the condition by" +
- " calling ifNotExists/ifEquals/ifMatches before executing the request");
+ " calling ifNotExists/ifEquals/ifMatches before executing the request");
}
@Override
@@ -354,14 +361,12 @@ class RawAsyncTableImpl implements RawAsyncTable {
} else {
try {
org.apache.hadoop.hbase.client.MultiResponse multiResp =
- ResponseConverter.getResults(req, resp, controller.cellScanner());
+ ResponseConverter.getResults(req, resp, controller.cellScanner());
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
- future
- .completeExceptionally(ex instanceof IOException ? ex
- : new IOException(
- "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()),
- ex));
+ future.completeExceptionally(ex instanceof IOException ? ex
+ : new IOException(
+ "Failed to mutate row: " + Bytes.toStringBinary(mutation.getRow()), ex));
} else {
future.complete(respConverter
.apply((Result) multiResp.getResults().get(regionName).result.get(0)));
@@ -400,11 +405,28 @@ class RawAsyncTableImpl implements RawAsyncTable {
return newScan;
}
+ public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
+ new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
+ maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+ }
+
+ private long resultSize2CacheSize(long maxResultSize) {
+ // * 2 if possible
+ return maxResultSize > Long.MAX_VALUE / 2 ? maxResultSize : maxResultSize * 2;
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) {
+ return new AsyncTableResultScanner(this, ReflectionUtils.newInstance(scan.getClass(), scan),
+ resultSize2CacheSize(
+ scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize));
+ }
+
@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
- scan(scan, new RawScanResultConsumer() {
+ scan(scan, new AdvancedScanResultConsumer() {
@Override
public void onNext(Result[] results, ScanController controller) {
@@ -424,11 +446,6 @@ class RawAsyncTableImpl implements RawAsyncTable {
return future;
}
- public void scan(Scan scan, RawScanResultConsumer consumer) {
- new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
- maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
- }
-
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
return batch(gets, readRpcTimeoutNs);
@@ -487,7 +504,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
}
private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, RegionInfo region, byte[] row) {
+ ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs);
S stub = stubMaker.apply(channel);
@@ -505,7 +522,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, byte[] row) {
+ ServiceCaller<S, R> callable, byte[] row) {
return coprocessorService(stubMaker, callable, null, row);
}
@@ -527,7 +544,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
}
private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback,
+ ServiceCaller<S, R> callable, CoprocessorCallback<R> callback,
List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
Throwable error) {
@@ -563,7 +580,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
private final Function<RpcChannel, S> stubMaker;
- private final CoprocessorCallable<S, R> callable;
+ private final ServiceCaller<S, R> callable;
private final CoprocessorCallback<R> callback;
@@ -576,7 +593,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
private boolean endKeyInclusive;
public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
- CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback) {
+ ServiceCaller<S, R> callable, CoprocessorCallback<R> callback) {
this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
this.callable = Preconditions.checkNotNull(callable, "callable is null");
this.callback = Preconditions.checkNotNull(callback, "callback is null");
@@ -586,8 +603,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
this.startKey = Preconditions.checkNotNull(startKey,
"startKey is null. Consider using" +
- " an empty byte array, or just do not call this method if you want to start selection" +
- " from the first region");
+ " an empty byte array, or just do not call this method if you want to start selection" +
+ " from the first region");
this.startKeyInclusive = inclusive;
return this;
}
@@ -596,8 +613,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
this.endKey = Preconditions.checkNotNull(endKey,
"endKey is null. Consider using" +
- " an empty byte array, or just do not call this method if you want to continue" +
- " selection to the last region");
+ " an empty byte array, or just do not call this method if you want to continue" +
+ " selection to the last region");
this.endKeyInclusive = inclusive;
return this;
}
@@ -614,7 +631,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
- Function<RpcChannel, S> stubMaker, CoprocessorCallable<S, R> callable,
+ Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) {
return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
deleted file mode 100644
index 7ab02d8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.util.Optional;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-
-/**
- * Receives {@link Result} for an asynchronous scan.
- * <p>
- * Notice that, the {@link #onNext(Result[], ScanController)} method will be called in the thread
- * which we send request to HBase service. So if you want the asynchronous scanner fetch data from
- * HBase in background while you process the returned data, you need to move the processing work to
- * another thread to make the {@code onNext} call return immediately. And please do NOT do any time
- * consuming tasks in all methods below unless you know what you are doing.
- * @since 2.0.0
- */
-@InterfaceAudience.Public
-public interface RawScanResultConsumer {
-
- /**
- * Used to resume a scan.
- */
- @InterfaceAudience.Public
- interface ScanResumer {
-
- /**
- * Resume the scan. You are free to call it multiple time but only the first call will take
- * effect.
- */
- void resume();
- }
-
- /**
- * Used to suspend or stop a scan, or get a scan cursor if available.
- * <p>
- * Notice that, you should only call the {@link #suspend()} or {@link #terminate()} inside onNext
- * or onHeartbeat method. A IllegalStateException will be thrown if you call them at other places.
- * <p>
- * You can only call one of the {@link #suspend()} and {@link #terminate()} methods(of course you
- * are free to not call them both), and the methods are not reentrant. An IllegalStateException
- * will be thrown if you have already called one of the methods.
- */
- @InterfaceAudience.Public
- interface ScanController {
-
- /**
- * Suspend the scan.
- * <p>
- * This means we will stop fetching data in background, i.e., will not call onNext any more
- * before you resume the scan.
- * @return A resumer used to resume the scan later.
- */
- ScanResumer suspend();
-
- /**
- * Terminate the scan.
- * <p>
- * This is useful when you have got enough results and want to stop the scan in onNext method,
- * or you want to stop the scan in onHeartbeat method because it has spent too many time.
- */
- void terminate();
-
- /**
- * Get the scan cursor if available.
- * @return The scan cursor.
- */
- Optional<Cursor> cursor();
- }
-
- /**
- * Indicate that we have receive some data.
- * @param results the data fetched from HBase service.
- * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
- * instance is only valid within scope of onNext method. You can only call its method in
- * onNext, do NOT store it and call it later outside onNext.
- */
- void onNext(Result[] results, ScanController controller);
-
- /**
- * Indicate that there is a heartbeat message but we have not cumulated enough cells to call
- * {@link #onNext(Result[], ScanController)}.
- * <p>
- * Note that this method will always be called when RS returns something to us but we do not have
- * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a
- * 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is
- * exceeded before sending all the cells for this row. For RS it does send some data to us and the
- * time limit has not been reached, but we can not return the data to client so here we call this
- * method to tell client we have already received something.
- * <p>
- * This method give you a chance to terminate a slow scan operation.
- * @param controller used to suspend or terminate the scan. Notice that the {@code controller}
- * instance is only valid within the scope of onHeartbeat method. You can only call its
- * method in onHeartbeat, do NOT store it and call it later outside onHeartbeat.
- */
- default void onHeartbeat(ScanController controller) {
- }
-
- /**
- * Indicate that we hit an unrecoverable error and the scan operation is terminated.
- * <p>
- * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
- */
- void onError(Throwable error);
-
- /**
- * Indicate that the scan operation is completed normally.
- */
- void onComplete();
-
- /**
- * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
- * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
- * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
- * store it somewhere to get the metrics at any time if you want.
- */
- default void onScanMetricsCreated(ScanMetrics scanMetrics) {
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
index 826a8ef..be3108b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumer.java
@@ -18,38 +18,20 @@
package org.apache.hadoop.hbase.client;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
/**
* Receives {@link Result} for an asynchronous scan.
+ * <p>
+ * All results that match the given scan object will be passed to this class by calling
+ * {@link #onNext(Result)}. {@link #onComplete()} means the scan is finished, and
+ * {@link #onError(Throwable)} means we hit an unrecoverable error and the scan is terminated.
*/
@InterfaceAudience.Public
-public interface ScanResultConsumer {
+public interface ScanResultConsumer extends ScanResultConsumerBase {
/**
* @param result the data fetched from HBase service.
* @return {@code false} if you want to terminate the scan process. Otherwise {@code true}
*/
boolean onNext(Result result);
-
- /**
- * Indicate that we hit an unrecoverable error and the scan operation is terminated.
- * <p>
- * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
- */
- void onError(Throwable error);
-
- /**
- * Indicate that the scan operation is completed normally.
- */
- void onComplete();
-
- /**
- * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
- * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
- * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
- * store it somewhere to get the metrics at any time if you want.
- */
- default void onScanMetricsCreated(ScanMetrics scanMetrics) {
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java
new file mode 100644
index 0000000..538cf9d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScanResultConsumerBase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The base interface for scan result consumer.
+ */
+@InterfaceAudience.Public
+public interface ScanResultConsumerBase {
+ /**
+ * Indicate that we hit an unrecoverable error and the scan operation is terminated.
+ * <p>
+ * We will not call {@link #onComplete()} after calling {@link #onError(Throwable)}.
+ */
+ void onError(Throwable error);
+
+ /**
+ * Indicate that the scan operation is completed normally.
+ */
+ void onComplete();
+
+ /**
+ * If {@code scan.isScanMetricsEnabled()} returns true, then this method will be called prior to
+ * all other methods in this interface to give you the {@link ScanMetrics} instance for this scan
+ * operation. The {@link ScanMetrics} instance will be updated on-the-fly during the scan, you can
+ * store it somewhere to get the metrics at any time if you want.
+ */
+ default void onScanMetricsCreated(ScanMetrics scanMetrics) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java
new file mode 100644
index 0000000..467f1a2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServiceCaller.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Delegate to a protobuf rpc call.
+ * <p>
+ * Usually, it is just a simple lambda expression, like:
+ *
+ * <pre>
+ * <code>
+ * (stub, controller, rpcCallback) -> {
+ * XXXRequest request = ...; // prepare the request
+ * stub.xxx(controller, request, rpcCallback);
+ * }
+ * </code>
+ * </pre>
+ *
+ * And if already have the {@code request}, the lambda expression will be:
+ *
+ * <pre>
+ * <code>
+ * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback)
+ * </code>
+ * </pre>
+ *
+ * @param <S> the type of the protobuf Service you want to call.
+ * @param <R> the type of the return value.
+ */
+@InterfaceAudience.Public
+@FunctionalInterface
+public interface ServiceCaller<S, R> {
+
+ /**
+ * Represent the actual protobuf rpc call.
+ * @param stub the asynchronous stub
+ * @param controller the rpc controller, has already been prepared for you
+ * @param rpcCallback the rpc callback, has already been prepared for you
+ */
+ void call(S stub, RpcController controller, RpcCallback<R> rpcCallback);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
index ff9b873..371e865 100644
--- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
+++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java
@@ -32,9 +32,9 @@ import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.RawAsyncTable;
-import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
-import org.apache.hadoop.hbase.client.RawScanResultConsumer;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.AsyncTable.CoprocessorCallback;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -126,7 +126,7 @@ public class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
- max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ max(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
AggregateRequest req;
try {
@@ -163,7 +163,7 @@ public class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
- min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ min(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
AggregateRequest req;
try {
@@ -201,7 +201,7 @@ public class AsyncAggregationClient {
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Long>
- rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ rowCount(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Long> future = new CompletableFuture<>();
AggregateRequest req;
try {
@@ -233,7 +233,7 @@ public class AsyncAggregationClient {
}
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S>
- sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ sum(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<S> future = new CompletableFuture<>();
AggregateRequest req;
try {
@@ -269,7 +269,7 @@ public class AsyncAggregationClient {
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Double>
- avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ avg(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Double> future = new CompletableFuture<>();
AggregateRequest req;
try {
@@ -307,7 +307,7 @@ public class AsyncAggregationClient {
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Double>
- std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ std(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Double> future = new CompletableFuture<>();
AggregateRequest req;
try {
@@ -351,7 +351,7 @@ public class AsyncAggregationClient {
// the map key is the startRow of the region
private static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<NavigableMap<byte[], S>>
- sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ sumByRegion(AsyncTable<?> table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<NavigableMap<byte[], S>> future =
new CompletableFuture<NavigableMap<byte[], S>>();
AggregateRequest req;
@@ -388,8 +388,8 @@ public class AsyncAggregationClient {
}
private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian(
- CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci,
- Scan scan, NavigableMap<byte[], S> sumByRegion) {
+ CompletableFuture<R> future, AsyncTable<AdvancedScanResultConsumer> table,
+ ColumnInterpreter<R, S, P, Q, T> ci, Scan scan, NavigableMap<byte[], S> sumByRegion) {
double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
S movingSum = null;
byte[] startRow = null;
@@ -410,7 +410,7 @@ public class AsyncAggregationClient {
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
byte[] weightQualifier = qualifiers.last();
byte[] valueQualifier = qualifiers.first();
- table.scan(scan, new RawScanResultConsumer() {
+ table.scan(scan, new AdvancedScanResultConsumer() {
private S sum = baseSum;
@@ -456,8 +456,9 @@ public class AsyncAggregationClient {
});
}
- public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
- median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
+ public static <R, S, P extends Message, Q extends Message, T extends Message>
+ CompletableFuture<R> median(AsyncTable<AdvancedScanResultConsumer> table,
+ ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
if (error != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
index 389aaaf..12e5b8d 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java
@@ -55,7 +55,7 @@ public class TestAsyncAggregationClient {
private static AsyncConnection CONN;
- private static RawAsyncTable TABLE;
+ private static AsyncTable<AdvancedScanResultConsumer> TABLE;
@BeforeClass
public static void setUp() throws Exception {
@@ -69,7 +69,7 @@ public class TestAsyncAggregationClient {
}
UTIL.createTable(TABLE_NAME, CF, splitKeys);
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
- TABLE = CONN.getRawTable(TABLE_NAME);
+ TABLE = CONN.getTable(TABLE_NAME);
TABLE.putAll(LongStream.range(0, COUNT)
.mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))
.addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, Bytes.toBytes(l * l)))
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
index 2105547..67aba62 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java
@@ -141,7 +141,7 @@ public class AsyncClientExample extends Configured implements Tool {
latch.countDown();
return;
}
- AsyncTable table = conn.getTable(tableName, threadPool);
+ AsyncTable<?> table = conn.getTable(tableName, threadPool);
table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)))
.whenComplete((putResp, putErr) -> {
if (putErr != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/54827cf6/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
index bb83bac..e3686f4 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java
@@ -17,9 +17,24 @@
*/
package org.apache.hadoop.hbase.client.example;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
+import org.apache.hadoop.hbase.util.Bytes;
+
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
@@ -43,26 +58,10 @@ import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpVersion;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.GlobalEventExecutor;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RawAsyncTable;
-import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
-import org.apache.hadoop.hbase.util.Bytes;
-
/**
- * A simple example on how to use {@link RawAsyncTable} to write a fully asynchronous HTTP proxy
- * server. The {@link AsyncConnection} will share the same event loop with the HTTP server.
+ * A simple example on how to use {@link org.apache.hadoop.hbase.client.AsyncTable} to write a fully
+ * asynchronous HTTP proxy server. The {@link AsyncConnection} will share the same event loop with
+ * the HTTP server.
* <p>
* The request URL is:
*
@@ -160,7 +159,7 @@ public class HttpProxyExample {
private void get(ChannelHandlerContext ctx, FullHttpRequest req) {
Params params = parse(req);
- conn.getRawTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
+ conn.getTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row))
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)))
.whenComplete((r, e) -> {
if (e != null) {
@@ -181,7 +180,7 @@ public class HttpProxyExample {
Params params = parse(req);
byte[] value = new byte[req.content().readableBytes()];
req.content().readBytes(value);
- conn.getRawTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
+ conn.getTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row))
.addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value))
.whenComplete((r, e) -> {
if (e != null) {