You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/24 04:10:47 UTC
[3/3] hbase git commit: HBASE-17174 Refactor the AsyncProcess,
BufferedMutatorImpl, and HTable
HBASE-17174 Refactor the AsyncProcess, BufferedMutatorImpl, and HTable
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8cb55c40
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8cb55c40
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8cb55c40
Branch: refs/heads/master
Commit: 8cb55c4080206a651023f6d042fac295192f1c2b
Parents: 992e571
Author: ChiaPing Tsai <ch...@gmail.com>
Authored: Sat Dec 24 12:02:05 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Dec 24 12:02:05 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 704 +++--------------
.../hadoop/hbase/client/AsyncProcessTask.java | 229 ++++++
.../hbase/client/AsyncRequestFutureImpl.java | 46 +-
.../hbase/client/BufferedMutatorImpl.java | 165 ++--
.../hbase/client/BufferedMutatorParams.java | 21 +-
.../hbase/client/ConnectionConfiguration.java | 19 +-
.../hbase/client/ConnectionImplementation.java | 13 +-
.../org/apache/hadoop/hbase/client/HTable.java | 295 ++++---
.../hadoop/hbase/client/HTableMultiplexer.java | 15 +-
.../hadoop/hbase/client/RequestController.java | 125 +++
.../hbase/client/RequestControllerFactory.java | 44 ++
.../apache/hadoop/hbase/client/RowAccess.java | 3 +-
.../hbase/client/SimpleRequestController.java | 519 +++++++++++++
.../hadoop/hbase/client/TestAsyncProcess.java | 769 ++++++++-----------
.../client/TestSimpleRequestController.java | 336 ++++++++
.../hbase/client/HConnectionTestingUtility.java | 7 +-
.../hadoop/hbase/client/TestClientPushback.java | 19 +-
.../hadoop/hbase/client/TestReplicasClient.java | 14 +-
.../regionserver/TestPerColumnFamilyFlush.java | 1 -
.../security/access/TestTablePermissions.java | 72 +-
20 files changed, 2128 insertions(+), 1288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 50a2a11..d1583f5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -19,45 +19,35 @@
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
+import org.apache.hadoop.hbase.client.RequestController.ReturnCode;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdge;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* This class allows a continuous flow of requests. It's written to be compatible with a
@@ -95,9 +85,10 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* </p>
*/
@InterfaceAudience.Private
+@InterfaceStability.Evolving
class AsyncProcess {
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
- protected static final AtomicLong COUNTER = new AtomicLong();
+ private static final AtomicLong COUNTER = new AtomicLong();
public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
@@ -116,31 +107,6 @@ class AsyncProcess {
*/
public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
- protected final int thresholdToLogUndoneTaskDetails;
- private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
- "hbase.client.threshold.log.details";
- private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
- private static final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
-
- /**
- * The maximum size of single RegionServer.
- */
- public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
-
- /**
- * Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
- */
- public static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
-
- /**
- * The maximum size of submit.
- */
- public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
- /**
- * Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
- */
- public static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
-
/**
* Return value from a submit that didn't contain any requests.
*/
@@ -173,64 +139,42 @@ class AsyncProcess {
};
// TODO: many of the fields should be made private
- protected final long id;
-
- protected final ClusterConnection connection;
- protected final RpcRetryingCallerFactory rpcCallerFactory;
- protected final RpcControllerFactory rpcFactory;
- protected final BatchErrors globalErrors;
- protected final ExecutorService pool;
-
- protected final AtomicLong tasksInProgress = new AtomicLong(0);
- protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
- new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
- protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
- new ConcurrentHashMap<ServerName, AtomicInteger>();
- // Start configuration settings.
- protected final int startLogErrorsCnt;
+ final long id;
- /**
- * The number of tasks simultaneously executed on the cluster.
- */
- protected final int maxTotalConcurrentTasks;
+ final ClusterConnection connection;
+ private final RpcRetryingCallerFactory rpcCallerFactory;
+ final RpcControllerFactory rpcFactory;
+ final BatchErrors globalErrors;
- /**
- * The max heap size of all tasks simultaneously executed on a server.
- */
- protected final long maxHeapSizePerRequest;
- protected final long maxHeapSizeSubmit;
- /**
- * The number of tasks we run in parallel on a single region.
- * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
- * a set of operations on a region before the previous one is done. As well, this limits
- * the pressure we put on the region server.
- */
- protected final int maxConcurrentTasksPerRegion;
+ // Start configuration settings.
+ final int startLogErrorsCnt;
- /**
- * The number of task simultaneously executed on a single region server.
- */
- protected final int maxConcurrentTasksPerServer;
- protected final long pause;
- protected final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
- protected int numTries;
- protected int serverTrackerTimeout;
- protected int rpcTimeout;
- protected int operationTimeout;
- protected long primaryCallTimeoutMicroseconds;
+ final long pause;
+ final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
+ final int numTries;
+ @VisibleForTesting
+ int serverTrackerTimeout;
+ final long primaryCallTimeoutMicroseconds;
/** Whether to log details for batch errors */
- protected final boolean logBatchErrorDetails;
+ final boolean logBatchErrorDetails;
// End configuration settings.
- public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
+ /**
+ * The traffic control for requests.
+ */
+ @VisibleForTesting
+ final RequestController requestController;
+ public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
+ private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
+ private final int periodToLog;
+ AsyncProcess(ClusterConnection hc, Configuration conf,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
- RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) {
+ RpcControllerFactory rpcFactory) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
this.connection = hc;
- this.pool = pool;
this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
this.id = COUNTER.incrementAndGet();
@@ -249,42 +193,10 @@ class AsyncProcess {
// how many times we could try in total, one more than retry number
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
- this.rpcTimeout = rpcTimeout;
- this.operationTimeout = operationTimeout;
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
-
- this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
- this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
- this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
- this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
- DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
- this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
-
- if (this.maxTotalConcurrentTasks <= 0) {
- throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
- }
- if (this.maxConcurrentTasksPerServer <= 0) {
- throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
- maxConcurrentTasksPerServer);
- }
- if (this.maxConcurrentTasksPerRegion <= 0) {
- throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
- maxConcurrentTasksPerRegion);
- }
- if (this.maxHeapSizePerRequest <= 0) {
- throw new IllegalArgumentException("maxHeapSizePerServer=" +
- maxHeapSizePerRequest);
- }
-
- if (this.maxHeapSizeSubmit <= 0) {
- throw new IllegalArgumentException("maxHeapSizeSubmit=" +
- maxHeapSizeSubmit);
- }
+ this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, DEFAULT_LOG_DETAILS_PERIOD);
// Server tracker allows us to do faster, and yet useful (hopefully), retries.
// However, if we are too useful, we might fail very quickly due to retry count limit.
// To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
@@ -301,43 +213,30 @@ class AsyncProcess {
this.rpcFactory = rpcFactory;
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
- this.thresholdToLogUndoneTaskDetails =
- conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
- DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
+ this.requestController = RequestControllerFactory.create(conf);
}
/**
- * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
- * RuntimeException
+ * The submitted task may be not accomplished at all if there are too many running tasks or
+ * other limits.
+ * @param <CResult> The class to cast the result
+ * @param task The setting and data
+ * @return AsyncRequestFuture
*/
- protected ExecutorService getPool(ExecutorService pool) {
- if (pool != null) {
- return pool;
- }
- if (this.pool != null) {
- return this.pool;
+ public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException {
+ AsyncRequestFuture reqFuture = checkTask(task);
+ if (reqFuture != null) {
+ return reqFuture;
+ }
+ SubmittedRows submittedRows = task.getSubmittedRows() == null ? SubmittedRows.ALL : task.getSubmittedRows();
+ switch (submittedRows) {
+ case ALL:
+ return submitAll(task);
+ case AT_LEAST_ONE:
+ return submit(task, true);
+ default:
+ return submit(task, false);
}
- throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
- }
-
- /**
- * See #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean).
- * Uses default ExecutorService for this AP (must have been created with one).
- */
- public <CResult> AsyncRequestFuture submit(TableName tableName,
- final RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
- boolean needResults) throws InterruptedIOException {
- return submit(null, tableName, rows, atLeastOne, callback, needResults);
- }
- /**
- * See {@link #submit(ExecutorService, TableName, RowAccess, boolean, Batch.Callback, boolean)}.
- * Uses the {@link ListRowAccess} to wrap the {@link List}.
- */
- public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
- List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
- boolean needResults) throws InterruptedIOException {
- return submit(pool, tableName, new ListRowAccess(rows), atLeastOne,
- callback, needResults);
}
/**
@@ -345,20 +244,13 @@ class AsyncProcess {
* list. Does not send requests to replicas (not currently used for anything other
* than streaming puts anyway).
*
- * @param pool ExecutorService to use.
- * @param tableName The table for which this request is needed.
- * @param callback Batch callback. Only called on success (94 behavior).
- * @param needResults Whether results are needed, or can be discarded.
- * @param rows - the submitted row. Modified by the method: we remove the rows we took.
+ * @param task The setting and data
* @param atLeastOne true if we should submit at least a subset.
*/
- public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
- RowAccess<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
- boolean needResults) throws InterruptedIOException {
- if (rows.isEmpty()) {
- return NO_REQS_RESULT;
- }
-
+ private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
+ boolean atLeastOne) throws InterruptedIOException {
+ TableName tableName = task.getTableName();
+ RowAccess<? extends Row> rows = task.getRowAccess();
Map<ServerName, MultiAction> actionsByServer =
new HashMap<ServerName, MultiAction>();
List<Action> retainedActions = new ArrayList<Action>(rows.size());
@@ -369,11 +261,11 @@ class AsyncProcess {
// Location errors that happen before we decide what requests to take.
List<Exception> locationErrors = null;
List<Integer> locationErrorRows = null;
- RowCheckerHost checker = createRowCheckerHost();
+ RequestController.Checker checker = requestController.newChecker();
boolean firstIter = true;
do {
// Wait until there is at least one slot for a new task.
- waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
+ requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
int posInList = -1;
if (!firstIter) {
checker.reset();
@@ -406,8 +298,7 @@ class AsyncProcess {
it.remove();
break; // Backward compat: we stop considering actions on location error.
}
- long rowSize = (r instanceof Mutation) ? ((Mutation) r).heapSize() : 0;
- ReturnCode code = checker.canTakeOperation(loc, rowSize);
+ ReturnCode code = checker.canTakeRow(loc, r);
if (code == ReturnCode.END) {
break;
}
@@ -426,29 +317,14 @@ class AsyncProcess {
if (retainedActions.isEmpty()) return NO_REQS_RESULT;
- return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
- locationErrors, locationErrorRows, actionsByServer, pool);
+ return submitMultiActions(task, retainedActions, nonceGroup,
+ locationErrors, locationErrorRows, actionsByServer);
}
- private RowCheckerHost createRowCheckerHost() {
- return new RowCheckerHost(Arrays.asList(
- new TaskCountChecker(maxTotalConcurrentTasks,
- maxConcurrentTasksPerServer,
- maxConcurrentTasksPerRegion,
- tasksInProgress,
- taskCounterPerServer,
- taskCounterPerRegion)
- , new RequestSizeChecker(maxHeapSizePerRequest)
- , new SubmittedSizeChecker(maxHeapSizeSubmit)
- ));
- }
- <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
- List<Action> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
- Object[] results, boolean needResults, List<Exception> locationErrors,
- List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer,
- ExecutorService pool) {
- AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
- tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
+ <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
+ List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors,
+ List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
+ AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, retainedActions, nonceGroup);
// Add location errors if any
if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) {
@@ -462,14 +338,6 @@ class AsyncProcess {
return ars;
}
- public void setRpcTimeout(int rpcTimeout) {
- this.rpcTimeout = rpcTimeout;
- }
-
- public void setOperationTimeout(int operationTimeout) {
- this.operationTimeout = operationTimeout;
- }
-
/**
* Helper that is used when grouping the actions per region server.
*
@@ -493,24 +361,13 @@ class AsyncProcess {
multiAction.add(regionName, action);
}
- public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
- List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
- return submitAll(pool, tableName, rows, callback, results, null, -1);
- }
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
* compatibility: it allows to be used with the batch interface that return an array of objects.
- *
- * @param pool ExecutorService to use.
- * @param tableName name of the table for which the submission is made.
- * @param rows the list of rows.
- * @param callback the callback.
- * @param results Optional array to return the results thru; backward compat.
- * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting.
+ * @param task The setting and data
*/
- public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
- List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
- CancellableRegionServerCallable callable, int rpcTimeout) {
+ private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) {
+ RowAccess<? extends Row> rows = task.getRowAccess();
List<Action> actions = new ArrayList<Action>(rows.size());
// The position will be used by the processBatch to match the object array returned.
@@ -528,93 +385,78 @@ class AsyncProcess {
setNonce(ng, r, action);
actions.add(action);
}
- AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
- tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
- callable, rpcTimeout);
+ AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(task, actions, ng.getNonceGroup());
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
+ private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) {
+ if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
+ return NO_REQS_RESULT;
+ }
+ Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
+ checkOperationTimeout(task.getOperationTimeout());
+ checkRpcTimeout(task.getRpcTimeout());
+ return null;
+ }
+
private void setNonce(NonceGenerator ng, Row r, Action action) {
if (!(r instanceof Append) && !(r instanceof Increment)) return;
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
}
- protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
- TableName tableName, List<Action> actions, long nonceGroup, ExecutorService pool,
- Batch.Callback<CResult> callback, Object[] results, boolean needResults,
- CancellableRegionServerCallable callable, int rpcTimeout) {
- return new AsyncRequestFutureImpl<CResult>(
- tableName, actions, nonceGroup, getPool(pool), needResults,
- results, callback, callable, operationTimeout,
- rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this);
+ private int checkTimeout(String name, int timeout) {
+ if (timeout < 0) {
+ throw new RuntimeException("The " + name + " must be bigger than zero,"
+ + "current value is" + timeout);
+ }
+ return timeout;
+ }
+ private int checkOperationTimeout(int operationTimeout) {
+ return checkTimeout("operation timeout", operationTimeout);
+ }
+
+ private int checkRpcTimeout(int rpcTimeout) {
+ return checkTimeout("rpc timeout", rpcTimeout);
+ }
+
+ @VisibleForTesting
+ <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
+ AsyncProcessTask task, List<Action> actions, long nonceGroup) {
+ return new AsyncRequestFutureImpl<>(task, actions, nonceGroup, this);
}
/** Wait until the async does not have more than max tasks in progress. */
- protected void waitForMaximumCurrentTasks(int max, String tableName)
+ protected void waitForMaximumCurrentTasks(int max, TableName tableName)
throws InterruptedIOException {
- waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
+ requestController.waitForMaximumCurrentTasks(max, id, periodToLog,
+ getLogger(tableName, max));
}
- // Break out this method so testable
- @VisibleForTesting
- void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
- String tableName) throws InterruptedIOException {
- long lastLog = EnvironmentEdgeManager.currentTime();
- long currentInProgress, oldInProgress = Long.MAX_VALUE;
- while ((currentInProgress = tasksInProgress.get()) > max) {
- if (oldInProgress != currentInProgress) { // Wait for in progress to change.
- long now = EnvironmentEdgeManager.currentTime();
- if (now > lastLog + 10000) {
- lastLog = now;
- LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
- + max + ", tasksInProgress=" + currentInProgress +
- " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
- if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
- logDetailsOfUndoneTasks(currentInProgress);
- }
- }
- }
- oldInProgress = currentInProgress;
- try {
- synchronized (tasksInProgress) {
- if (tasksInProgress.get() == oldInProgress) {
- tasksInProgress.wait(10);
- }
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException("#" + id + ", interrupted." +
- " currentNumberOfTask=" + currentInProgress);
- }
- }
+ private Consumer<Long> getLogger(TableName tableName, long max) {
+ return (currentInProgress) -> {
+ LOG.info("#" + id + (max < 0 ? ", waiting for any free slot"
+ : ", waiting for some tasks to finish. Expected max="
+ + max) + ", tasksInProgress=" + currentInProgress +
+ " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
+ };
}
- void logDetailsOfUndoneTasks(long taskInProgress) {
- ArrayList<ServerName> servers = new ArrayList<ServerName>();
- for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
- if (entry.getValue().get() > 0) {
- servers.add(entry.getKey());
- }
- }
- LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
- if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
- ArrayList<String> regions = new ArrayList<String>();
- for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
- if (entry.getValue().get() > 0) {
- regions.add(Bytes.toString(entry.getKey()));
- }
- }
- LOG.info("Regions against which left over task(s) are processed: " + regions);
- }
+ void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
+ requestController.incTaskCounters(regions, sn);
}
+
+ void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
+ requestController.decTaskCounters(regions, sn);
+ }
/**
* Only used w/useGlobalErrors ctor argument, for HTable backward compat.
* @return Whether there were any errors in any request since the last time
* {@link #waitForAllPreviousOpsAndReset(List, String)} was called, or AP was created.
*/
public boolean hasError() {
- return globalErrors.hasErrors();
+ return globalErrors != null && globalErrors.hasErrors();
}
/**
@@ -628,9 +470,9 @@ class AsyncProcess {
* was called, or AP was created.
*/
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
- List<Row> failedRows, String tableName) throws InterruptedIOException {
+ List<Row> failedRows, TableName tableName) throws InterruptedIOException {
waitForMaximumCurrentTasks(0, tableName);
- if (!globalErrors.hasErrors()) {
+ if (globalErrors == null || !globalErrors.hasErrors()) {
return null;
}
if (failedRows != null) {
@@ -642,41 +484,12 @@ class AsyncProcess {
}
/**
- * increment the tasks counters for a given set of regions. MT safe.
- */
- protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
- tasksInProgress.incrementAndGet();
-
- computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
-
- for (byte[] regBytes : regions) {
- computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet();
- }
- }
-
- /**
- * Decrements the counters for a given region and the region server. MT Safe.
- */
- protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
- for (byte[] regBytes : regions) {
- AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
- regionCnt.decrementAndGet();
- }
-
- taskCounterPerServer.get(sn).decrementAndGet();
- tasksInProgress.decrementAndGet();
- synchronized (tasksInProgress) {
- tasksInProgress.notifyAll();
- }
- }
-
- /**
* Create a caller. Isolated to be easily overridden in the tests.
*/
@VisibleForTesting
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable, int rpcTimeout) {
- return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout);
+ return rpcCallerFactory.<AbstractResponse> newCaller(checkRpcTimeout(rpcTimeout));
}
@@ -687,7 +500,7 @@ class AsyncProcess {
* We may benefit from connection-wide tracking of server errors.
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
*/
- protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
+ ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
return new ConnectionImplementation.ServerErrorTracker(
this.serverTrackerTimeout, this.numTries);
}
@@ -696,283 +509,4 @@ class AsyncProcess {
return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
}
- /**
- * Collect all advices from checkers and make the final decision.
- */
- @VisibleForTesting
- static class RowCheckerHost {
- private final List<RowChecker> checkers;
- private boolean isEnd = false;
- RowCheckerHost(final List<RowChecker> checkers) {
- this.checkers = checkers;
- }
- void reset() throws InterruptedIOException {
- isEnd = false;
- InterruptedIOException e = null;
- for (RowChecker checker : checkers) {
- try {
- checker.reset();
- } catch (InterruptedIOException ex) {
- e = ex;
- }
- }
- if (e != null) {
- throw e;
- }
- }
- ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
- if (isEnd) {
- return ReturnCode.END;
- }
- ReturnCode code = ReturnCode.INCLUDE;
- for (RowChecker checker : checkers) {
- switch (checker.canTakeOperation(loc, rowSize)) {
- case END:
- isEnd = true;
- code = ReturnCode.END;
- break;
- case SKIP:
- code = ReturnCode.SKIP;
- break;
- case INCLUDE:
- default:
- break;
- }
- if (code == ReturnCode.END) {
- break;
- }
- }
- for (RowChecker checker : checkers) {
- checker.notifyFinal(code, loc, rowSize);
- }
- return code;
- }
- }
-
- /**
- * Provide a way to control the flow of rows iteration.
- */
- // Visible for Testing. Adding @VisibleForTesting here doesn't work for some reason.
- interface RowChecker {
- enum ReturnCode {
- /**
- * Accept current row.
- */
- INCLUDE,
- /**
- * Skip current row.
- */
- SKIP,
- /**
- * No more row can be included.
- */
- END
- };
- ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
- /**
- * Add the final ReturnCode to the checker.
- * The ReturnCode may be reversed, so the checker need the final decision to update
- * the inner state.
- */
- void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
- /**
- * Reset the inner state.
- */
- void reset() throws InterruptedIOException ;
- }
-
- /**
- * limit the heapsize of total submitted data.
- * Reduce the limit of heapsize for submitting quickly
- * if there is no running task.
- */
- @VisibleForTesting
- static class SubmittedSizeChecker implements RowChecker {
- private final long maxHeapSizeSubmit;
- private long heapSize = 0;
- SubmittedSizeChecker(final long maxHeapSizeSubmit) {
- this.maxHeapSizeSubmit = maxHeapSizeSubmit;
- }
- @Override
- public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
- if (heapSize >= maxHeapSizeSubmit) {
- return ReturnCode.END;
- }
- return ReturnCode.INCLUDE;
- }
-
- @Override
- public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
- if (code == ReturnCode.INCLUDE) {
- heapSize += rowSize;
- }
- }
-
- @Override
- public void reset() {
- heapSize = 0;
- }
- }
- /**
- * limit the max number of tasks in an AsyncProcess.
- */
- @VisibleForTesting
- static class TaskCountChecker implements RowChecker {
- private static final long MAX_WAITING_TIME = 1000; //ms
- private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
- private final Set<ServerName> serversIncluded = new HashSet<>();
- private final int maxConcurrentTasksPerRegion;
- private final int maxTotalConcurrentTasks;
- private final int maxConcurrentTasksPerServer;
- private final Map<byte[], AtomicInteger> taskCounterPerRegion;
- private final Map<ServerName, AtomicInteger> taskCounterPerServer;
- private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- private final AtomicLong tasksInProgress;
- TaskCountChecker(final int maxTotalConcurrentTasks,
- final int maxConcurrentTasksPerServer,
- final int maxConcurrentTasksPerRegion,
- final AtomicLong tasksInProgress,
- final Map<ServerName, AtomicInteger> taskCounterPerServer,
- final Map<byte[], AtomicInteger> taskCounterPerRegion) {
- this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
- this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
- this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
- this.taskCounterPerRegion = taskCounterPerRegion;
- this.taskCounterPerServer = taskCounterPerServer;
- this.tasksInProgress = tasksInProgress;
- }
- @Override
- public void reset() throws InterruptedIOException {
- // prevent the busy-waiting
- waitForRegion();
- regionsIncluded.clear();
- serversIncluded.clear();
- busyRegions.clear();
- }
- private void waitForRegion() throws InterruptedIOException {
- if (busyRegions.isEmpty()) {
- return;
- }
- EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
- final long start = ee.currentTime();
- while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
- for (byte[] region : busyRegions) {
- AtomicInteger count = taskCounterPerRegion.get(region);
- if (count == null || count.get() < maxConcurrentTasksPerRegion) {
- return;
- }
- }
- try {
- synchronized (tasksInProgress) {
- tasksInProgress.wait(10);
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted." +
- " tasksInProgress=" + tasksInProgress);
- }
- }
- }
- /**
- * 1) check the regions is allowed.
- * 2) check the concurrent tasks for regions.
- * 3) check the total concurrent tasks.
- * 4) check the concurrent tasks for server.
- * @param loc
- * @param rowSize
- * @return
- */
- @Override
- public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
-
- HRegionInfo regionInfo = loc.getRegionInfo();
- if (regionsIncluded.contains(regionInfo)) {
- // We already know what to do with this region.
- return ReturnCode.INCLUDE;
- }
- AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
- if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
- // Too many tasks on this region already.
- return ReturnCode.SKIP;
- }
- int newServers = serversIncluded.size()
- + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
- if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
- // Too many tasks.
- return ReturnCode.SKIP;
- }
- AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
- if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
- // Too many tasks for this individual server
- return ReturnCode.SKIP;
- }
- return ReturnCode.INCLUDE;
- }
-
- @Override
- public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
- if (code == ReturnCode.INCLUDE) {
- regionsIncluded.add(loc.getRegionInfo());
- serversIncluded.add(loc.getServerName());
- }
- busyRegions.add(loc.getRegionInfo().getRegionName());
- }
- }
-
- /**
- * limit the request size for each regionserver.
- */
- @VisibleForTesting
- static class RequestSizeChecker implements RowChecker {
- private final long maxHeapSizePerRequest;
- private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
- RequestSizeChecker(final long maxHeapSizePerRequest) {
- this.maxHeapSizePerRequest = maxHeapSizePerRequest;
- }
- @Override
- public void reset() {
- serverRequestSizes.clear();
- }
- @Override
- public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
- // Is it ok for limit of request size?
- long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
- serverRequestSizes.get(loc.getServerName()) : 0L;
- // accept at least one request
- if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
- return ReturnCode.INCLUDE;
- }
- return ReturnCode.SKIP;
- }
-
- @Override
- public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
- if (code == ReturnCode.INCLUDE) {
- long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName()) ?
- serverRequestSizes.get(loc.getServerName()) : 0L;
- serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
- }
- }
- }
-
- public static class ListRowAccess<T> implements RowAccess<T> {
- private final List<T> data;
- ListRowAccess(final List<T> data) {
- this.data = data;
- }
-
- @Override
- public int size() {
- return data.size();
- }
-
- @Override
- public boolean isEmpty() {
- return data.isEmpty();
- }
-
- @Override
- public Iterator<T> iterator() {
- return data.iterator();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
new file mode 100644
index 0000000..eda1db2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcessTask.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+
+/**
+ * Contains the attributes of a task which will be executed
+ * by {@link org.apache.hadoop.hbase.client.AsyncProcess}.
+ * The attributes will be validated by AsyncProcess.
+ * It's intended for advanced client applications.
+ * @param <T> The type of response from server-side
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AsyncProcessTask<T> {
+ /**
+ * The number of processed rows.
+ * The AsyncProcess has traffic control which may reject some rows.
+ */
+ public enum SubmittedRows {
+ ALL,
+ AT_LEAST_ONE,
+ NORMAL
+ }
+ public static <T> Builder<T> newBuilder(final Batch.Callback<T> callback) {
+ return new Builder<>(callback);
+ }
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder<T> {
+
+ private ExecutorService pool;
+ private TableName tableName;
+ private RowAccess<? extends Row> rows;
+ private SubmittedRows submittedRows = SubmittedRows.ALL;
+ private Batch.Callback<T> callback;
+ private boolean needResults;
+ private int rpcTimeout;
+ private int operationTimeout;
+ private CancellableRegionServerCallable callable;
+ private Object[] results;
+
+ private Builder() {
+ }
+
+ private Builder(Batch.Callback<T> callback) {
+ this.callback = callback;
+ }
+
+ Builder<T> setResults(Object[] results) {
+ this.results = results;
+ if (results != null && results.length != 0) {
+ setNeedResults(true);
+ }
+ return this;
+ }
+
+ public Builder<T> setPool(ExecutorService pool) {
+ this.pool = pool;
+ return this;
+ }
+
+ public Builder<T> setRpcTimeout(int rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public Builder<T> setOperationTimeout(int operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ return this;
+ }
+
+ public Builder<T> setTableName(TableName tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public Builder<T> setRowAccess(List<? extends Row> rows) {
+ this.rows = new ListRowAccess<>(rows);
+ return this;
+ }
+
+ public Builder<T> setRowAccess(RowAccess<? extends Row> rows) {
+ this.rows = rows;
+ return this;
+ }
+
+ public Builder<T> setSubmittedRows(SubmittedRows submittedRows) {
+ this.submittedRows = submittedRows;
+ return this;
+ }
+
+ public Builder<T> setNeedResults(boolean needResults) {
+ this.needResults = needResults;
+ return this;
+ }
+
+ Builder<T> setCallable(CancellableRegionServerCallable callable) {
+ this.callable = callable;
+ return this;
+ }
+
+ public AsyncProcessTask<T> build() {
+ return new AsyncProcessTask<>(pool, tableName, rows, submittedRows,
+ callback, callable, needResults, rpcTimeout, operationTimeout, results);
+ }
+ }
+ private final ExecutorService pool;
+ private final TableName tableName;
+ private final RowAccess<? extends Row> rows;
+ private final SubmittedRows submittedRows;
+ private final Batch.Callback<T> callback;
+ private final CancellableRegionServerCallable callable;
+ private final boolean needResults;
+ private final int rpcTimeout;
+ private final int operationTimeout;
+ private final Object[] results;
+ AsyncProcessTask(AsyncProcessTask<T> task) {
+ this(task.getPool(), task.getTableName(), task.getRowAccess(),
+ task.getSubmittedRows(), task.getCallback(), task.getCallable(),
+ task.getNeedResults(), task.getRpcTimeout(), task.getOperationTimeout(),
+ task.getResults());
+ }
+ AsyncProcessTask(ExecutorService pool, TableName tableName,
+ RowAccess<? extends Row> rows, SubmittedRows size, Batch.Callback<T> callback,
+ CancellableRegionServerCallable callable, boolean needResults,
+ int rpcTimeout, int operationTimeout, Object[] results) {
+ this.pool = pool;
+ this.tableName = tableName;
+ this.rows = rows;
+ this.submittedRows = size;
+ this.callback = callback;
+ this.callable = callable;
+ this.needResults = needResults;
+ this.rpcTimeout = rpcTimeout;
+ this.operationTimeout = operationTimeout;
+ this.results = results;
+ }
+
+ public int getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public ExecutorService getPool() {
+ return pool;
+ }
+
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ public RowAccess<? extends Row> getRowAccess() {
+ return rows;
+ }
+
+ public SubmittedRows getSubmittedRows() {
+ return submittedRows;
+ }
+
+ public Batch.Callback<T> getCallback() {
+ return callback;
+ }
+
+ CancellableRegionServerCallable getCallable() {
+ return callable;
+ }
+
+ Object[] getResults() {
+ return results;
+ }
+
+ public boolean getNeedResults() {
+ return needResults;
+ }
+
+ public int getRpcTimeout() {
+ return rpcTimeout;
+ }
+
+ static class ListRowAccess<T> implements RowAccess<T> {
+
+ private final List<T> data;
+
+ ListRowAccess(final List<T> data) {
+ this.data = data;
+ }
+
+ @Override
+ public int size() {
+ return data.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return data.isEmpty();
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return data.iterator();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index d176ce1..036196e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -300,11 +300,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private final int[] replicaGetIndices;
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
- private CancellableRegionServerCallable currentCallable;
- private int operationTimeout;
- private int rpcTimeout;
+ private final CancellableRegionServerCallable currentCallable;
+ private final int operationTimeout;
+ private final int rpcTimeout;
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
- protected AsyncProcess asyncProcess;
+ private final AsyncProcess asyncProcess;
/**
* For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
@@ -339,32 +339,27 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
}
-
-
- public AsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup,
- ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback,
- CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
- AsyncProcess asyncProcess) {
- this.pool = pool;
- this.callback = callback;
+ public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
+ long nonceGroup, AsyncProcess asyncProcess) {
+ this.pool = task.getPool();
+ this.callback = task.getCallback();
this.nonceGroup = nonceGroup;
- this.tableName = tableName;
+ this.tableName = task.getTableName();
this.actionsInProgress.set(actions.size());
- if (results != null) {
- assert needResults;
- if (results.length != actions.size()) {
+ if (task.getResults() == null) {
+ results = task.getNeedResults() ? new Object[actions.size()] : null;
+ } else {
+ if (task.getResults().length != actions.size()) {
throw new AssertionError("results.length");
}
- this.results = results;
+ this.results = task.getResults();
for (int i = 0; i != this.results.length; ++i) {
results[i] = null;
}
- } else {
- this.results = needResults ? new Object[actions.size()] : null;
}
List<Integer> replicaGetIndices = null;
boolean hasAnyReplicaGets = false;
- if (needResults) {
+ if (results != null) {
// Check to see if any requests might require replica calls.
// We expect that many requests will consist of all or no multi-replica gets; in such
// cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
@@ -414,10 +409,10 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
this.errorsByServer = createServerErrorTracker();
this.errors = (asyncProcess.globalErrors != null)
? asyncProcess.globalErrors : new BatchErrors();
- this.operationTimeout = operationTimeout;
- this.rpcTimeout = rpcTimeout;
- this.currentCallable = callable;
- if (callable == null) {
+ this.operationTimeout = task.getOperationTimeout();
+ this.rpcTimeout = task.getRpcTimeout();
+ this.currentCallable = task.getCallable();
+ if (task.getCallable() == null) {
tracker = new RetryingTimeTracker().start();
}
}
@@ -1246,9 +1241,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
lastLog = now;
LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
+ " actions to finish on table: " + tableName);
- if (currentInProgress <= asyncProcess.thresholdToLogUndoneTaskDetails) {
- asyncProcess.logDetailsOfUndoneTasks(currentInProgress);
- }
}
}
synchronized (actionsInProgress) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 0085767..2a55de9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -19,12 +19,9 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
@@ -36,6 +33,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
/**
* <p>
@@ -67,61 +66,70 @@ public class BufferedMutatorImpl implements BufferedMutator {
"hbase.client.bufferedmutator.classname";
private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);
-
+
private final ExceptionListener listener;
- protected ClusterConnection connection; // non-final so can be overridden in test
private final TableName tableName;
- private volatile Configuration conf;
-
- @VisibleForTesting
- final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
- @VisibleForTesting
- AtomicLong currentWriteBufferSize = new AtomicLong(0);
+ private final Configuration conf;
+ private final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<>();
+ private final AtomicLong currentWriteBufferSize = new AtomicLong(0);
/**
* Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}.
* The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation.
*/
- @VisibleForTesting
- AtomicInteger undealtMutationCount = new AtomicInteger(0);
- private long writeBufferSize;
+ private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
+ private volatile long writeBufferSize;
private final int maxKeyValueSize;
- private boolean closed = false;
private final ExecutorService pool;
- private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
- private int operationTimeout;
+ private final AtomicInteger rpcTimeout;
+ private final AtomicInteger operationTimeout;
+ private final boolean cleanupPoolOnClose;
+ private volatile boolean closed = false;
+ private final AsyncProcess ap;
@VisibleForTesting
- protected AsyncProcess ap; // non-final so can be overridden in test
-
- BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
- RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+ BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
if (conn == null || conn.isClosed()) {
throw new IllegalArgumentException("Connection is null or closed.");
}
-
this.tableName = params.getTableName();
- this.connection = conn;
- this.conf = connection.getConfiguration();
- this.pool = params.getPool();
+ this.conf = conn.getConfiguration();
this.listener = params.getListener();
-
+ if (params.getPool() == null) {
+ this.pool = HTable.getDefaultExecutor(conf);
+ cleanupPoolOnClose = true;
+ } else {
+ this.pool = params.getPool();
+ cleanupPoolOnClose = false;
+ }
ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
params.getWriteBufferSize() : tableConf.getWriteBufferSize();
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
- this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.operationTimeout = conn.getConfiguration().getInt(
- HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- // puts need to track errors globally due to how the APIs currently work.
- ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory,
- writeRpcTimeout, operationTimeout);
+ this.rpcTimeout = new AtomicInteger(params.getRpcTimeout() != BufferedMutatorParams.UNSET ?
+ params.getRpcTimeout() : conn.getConnectionConfiguration().getWriteRpcTimeout());
+ this.operationTimeout = new AtomicInteger(params.getOperationTimeout()!= BufferedMutatorParams.UNSET ?
+ params.getOperationTimeout() : conn.getConnectionConfiguration().getOperationTimeout());
+ this.ap = ap;
+ }
+ BufferedMutatorImpl(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
+ RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
+ this(conn, params,
+ // puts need to track errors globally due to how the APIs currently work.
+ new AsyncProcess(conn, conn.getConfiguration(), rpcCallerFactory, true, rpcFactory));
+ }
+
+ @VisibleForTesting
+ ExecutorService getPool() {
+ return pool;
+ }
+
+ @VisibleForTesting
+ AsyncProcess getAsyncProcess() {
+ return ap;
}
@Override
@@ -193,22 +201,22 @@ public class BufferedMutatorImpl implements BufferedMutator {
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
backgroundFlushCommits(true);
- this.pool.shutdown();
- boolean terminated;
- int loopCnt = 0;
- do {
- // wait until the pool has terminated
- terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
- loopCnt += 1;
- if (loopCnt >= 10) {
- LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
- break;
- }
- } while (!terminated);
-
+ if (cleanupPoolOnClose) {
+ this.pool.shutdown();
+ boolean terminated;
+ int loopCnt = 0;
+ do {
+ // wait until the pool has terminated
+ terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+ loopCnt += 1;
+ if (loopCnt >= 10) {
+ LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool.");
+ break;
+ }
+ } while (!terminated);
+ }
} catch (InterruptedException e) {
LOG.warn("waitForTermination interrupted");
-
} finally {
this.closed = true;
}
@@ -239,8 +247,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
if (!synchronous) {
QueueRowAccess taker = new QueueRowAccess();
+ AsyncProcessTask task = wrapAsyncProcessTask(taker);
try {
- ap.submit(tableName, taker, true, null, false);
+ ap.submit(task);
if (ap.hasError()) {
LOG.debug(tableName + ": One or more of the operations have failed -"
+ " waiting for all operation in progress to finish (successfully or not)");
@@ -251,17 +260,17 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
if (synchronous || ap.hasError()) {
QueueRowAccess taker = new QueueRowAccess();
+ AsyncProcessTask task = wrapAsyncProcessTask(taker);
try {
while (!taker.isEmpty()) {
- ap.submit(tableName, taker, true, null, false);
+ ap.submit(task);
taker.reset();
}
} finally {
taker.restoreRemainder();
}
-
RetriesExhaustedWithDetailsException error =
- ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
+ ap.waitForAllPreviousOpsAndReset(null, tableName);
if (error != null) {
if (listener == null) {
throw error;
@@ -273,8 +282,38 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
/**
+ * Reuse the AsyncProcessTask when calling {@link BufferedMutatorImpl#backgroundFlushCommits(boolean)}.
+ * @param taker access the inner buffer.
+ * @return An AsyncProcessTask which always returns the latest rpc and operation timeout.
+ */
+ private AsyncProcessTask wrapAsyncProcessTask(QueueRowAccess taker) {
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(taker)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+ .build();
+ return new AsyncProcessTask(task) {
+ @Override
+ public int getRpcTimeout() {
+ return rpcTimeout.get();
+ }
+
+ @Override
+ public int getOperationTimeout() {
+ return operationTimeout.get();
+ }
+ };
+ }
+ /**
* This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought
* not be called for production uses.
+ * If the new buffer size is smaller than the stored data, the {@link BufferedMutatorImpl#flush()}
+ * will be called.
+ * @param writeBufferSize The max size of internal buffer where data is stored.
+ * @throws org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException
+ * if an I/O error occurs and there are too many retries.
+ * @throws java.io.InterruptedIOException if the I/O task is interrupted.
* @deprecated Going away when we drop public support for {@link HTable}.
*/
@Deprecated
@@ -295,15 +334,23 @@ public class BufferedMutatorImpl implements BufferedMutator {
}
@Override
- public void setRpcTimeout(int timeout) {
- this.writeRpcTimeout = timeout;
- ap.setRpcTimeout(timeout);
+ public void setRpcTimeout(int rpcTimeout) {
+ this.rpcTimeout.set(rpcTimeout);
}
@Override
- public void setOperationTimeout(int timeout) {
- this.operationTimeout = timeout;
- ap.setOperationTimeout(operationTimeout);
+ public void setOperationTimeout(int operationTimeout) {
+ this.operationTimeout.set(operationTimeout);
+ }
+
+ @VisibleForTesting
+ long getCurrentWriteBufferSize() {
+ return currentWriteBufferSize.get();
+ }
+
+ @VisibleForTesting
+ int size() {
+ return undealtMutationCount.get();
}
private class QueueRowAccess implements RowAccess<Row> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
index 17c69ec..9c901e2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java
@@ -39,7 +39,8 @@ public class BufferedMutatorParams implements Cloneable {
private int maxKeyValueSize = UNSET;
private ExecutorService pool = null;
private String implementationClassName = null;
-
+ private int rpcTimeout = UNSET;
+ private int operationTimeout = UNSET;
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
@@ -61,6 +62,24 @@ public class BufferedMutatorParams implements Cloneable {
return writeBufferSize;
}
+ public BufferedMutatorParams rpcTimeout(final int rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public int getRpcTimeout() {
+ return rpcTimeout;
+ }
+
+ public BufferedMutatorParams opertationTimeout(final int operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ return this;
+ }
+
+ public int getOperationTimeout() {
+ return operationTimeout;
+ }
+
/**
* Override the write buffer size specified by the provided {@link Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 35bebae..41f5baf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -42,7 +42,8 @@ public class ConnectionConfiguration {
private final int replicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
-
+ private final int readRpcTimeout;
+ private final int writeRpcTimeout;
// toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch;
@@ -80,6 +81,12 @@ public class ConnectionConfiguration {
Scan.HBASE_CLIENT_SCANNER_ASYNC_PREFETCH, Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH);
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
+
+ this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
+ this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
}
/**
@@ -99,6 +106,16 @@ public class ConnectionConfiguration {
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+ this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+ }
+
+ public int getReadRpcTimeout() {
+ return readRpcTimeout;
+ }
+
+ public int getWriteRpcTimeout() {
+ return writeRpcTimeout;
}
public long getWriteBufferSize() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index a597be3..ceac3fb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -249,7 +249,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
- this.asyncProcess = createAsyncProcess(this.conf);
+ this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, false, rpcControllerFactory);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
this.metrics = new MetricsConnection(this);
} else {
@@ -1833,17 +1833,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
metaCache.clearCache(regionInfo);
}
- // For tests to override.
- protected AsyncProcess createAsyncProcess(Configuration conf) {
- // No default pool available.
- int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
- rpcTimeout, operationTimeout);
- }
-
@Override
public AsyncProcess getAsyncProcess() {
return asyncProcess;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index dd11abf..fd5eda3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -103,27 +103,28 @@ import org.apache.hadoop.hbase.util.Threads;
@InterfaceStability.Stable
public class HTable implements Table {
private static final Log LOG = LogFactory.getLog(HTable.class);
- protected ClusterConnection connection;
+ private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
+ private final ClusterConnection connection;
private final TableName tableName;
- private volatile Configuration configuration;
- private ConnectionConfiguration connConfiguration;
- protected BufferedMutatorImpl mutator;
+ private final Configuration configuration;
+ private final ConnectionConfiguration connConfiguration;
+ @VisibleForTesting
+ BufferedMutatorImpl mutator;
private boolean closed = false;
- protected int scannerCaching;
- protected long scannerMaxResultSize;
- private ExecutorService pool; // For Multi & Scan
+ private final int scannerCaching;
+ private final long scannerMaxResultSize;
+ private final ExecutorService pool; // For Multi & Scan
private int operationTimeout; // global timeout for each blocking method with retrying rpc
private int readRpcTimeout; // timeout for each read rpc request
private int writeRpcTimeout; // timeout for each write rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
- private final boolean cleanupConnectionOnClose; // close the connection in close()
- private Consistency defaultConsistency = Consistency.STRONG;
- private HRegionLocator locator;
+ private final HRegionLocator locator;
/** The Async process for batch */
- protected AsyncProcess multiAp;
- private RpcRetryingCallerFactory rpcCallerFactory;
- private RpcControllerFactory rpcControllerFactory;
+ @VisibleForTesting
+ AsyncProcess multiAp;
+ private final RpcRetryingCallerFactory rpcCallerFactory;
+ private final RpcControllerFactory rpcControllerFactory;
// Marked Private @since 1.0
@InterfaceAudience.Private
@@ -167,22 +168,42 @@ public class HTable implements Table {
throw new IllegalArgumentException("Given table name is null");
}
this.tableName = tableName;
- this.cleanupConnectionOnClose = false;
this.connection = connection;
this.configuration = connection.getConfiguration();
- this.connConfiguration = tableConfig;
- this.pool = pool;
+ if (tableConfig == null) {
+ connConfiguration = new ConnectionConfiguration(configuration);
+ } else {
+ connConfiguration = tableConfig;
+ }
if (pool == null) {
this.pool = getDefaultExecutor(this.configuration);
this.cleanupPoolOnClose = true;
} else {
+ this.pool = pool;
this.cleanupPoolOnClose = false;
}
+ if (rpcCallerFactory == null) {
+ this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
+ } else {
+ this.rpcCallerFactory = rpcCallerFactory;
+ }
- this.rpcCallerFactory = rpcCallerFactory;
- this.rpcControllerFactory = rpcControllerFactory;
+ if (rpcControllerFactory == null) {
+ this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
+ } else {
+ this.rpcControllerFactory = rpcControllerFactory;
+ }
+
+ this.operationTimeout = tableName.isSystemTable() ?
+ connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
+ this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+ this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
+ this.scannerCaching = connConfiguration.getScannerCaching();
+ this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
- this.finishSetup();
+ // puts need to track errors globally due to how the APIs currently work.
+ multiAp = this.connection.getAsyncProcess();
+ this.locator = new HRegionLocator(tableName, connection);
}
/**
@@ -190,20 +211,23 @@ public class HTable implements Table {
* @throws IOException
*/
@VisibleForTesting
- protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException {
+ protected HTable(ClusterConnection conn, BufferedMutatorImpl mutator) throws IOException {
connection = conn;
- tableName = params.getTableName();
- connConfiguration = new ConnectionConfiguration(connection.getConfiguration());
+ this.tableName = mutator.getName();
+ this.configuration = connection.getConfiguration();
+ connConfiguration = new ConnectionConfiguration(configuration);
cleanupPoolOnClose = false;
- cleanupConnectionOnClose = false;
- // used from tests, don't trust the connection is real
- this.mutator = new BufferedMutatorImpl(conn, null, null, params);
- this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
- conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ this.mutator = mutator;
+ this.operationTimeout = tableName.isSystemTable() ?
+ connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
+ this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+ this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
+ this.scannerCaching = connConfiguration.getScannerCaching();
+ this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
+ this.rpcControllerFactory = null;
+ this.rpcCallerFactory = null;
+ this.pool = mutator.getPool();
+ this.locator = null;
}
/**
@@ -214,36 +238,6 @@ public class HTable implements Table {
}
/**
- * setup this HTable's parameter based on the passed configuration
- */
- private void finishSetup() throws IOException {
- if (connConfiguration == null) {
- connConfiguration = new ConnectionConfiguration(configuration);
- }
-
- this.operationTimeout = tableName.isSystemTable() ?
- connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
- this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
- configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
- configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.scannerCaching = connConfiguration.getScannerCaching();
- this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
- if (this.rpcCallerFactory == null) {
- this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
- }
- if (this.rpcControllerFactory == null) {
- this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
- }
-
- // puts need to track errors globally due to how the APIs currently work.
- multiAp = this.connection.getAsyncProcess();
- this.locator = new HRegionLocator(getName(), connection);
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -423,7 +417,7 @@ public class HTable implements Table {
get = ReflectionUtils.newInstance(get.getClass(), get);
get.setCheckExistenceOnly(checkExistenceOnly);
if (get.getConsistency() == null){
- get.setConsistency(defaultConsistency);
+ get.setConsistency(DEFAULT_CONSISTENCY);
}
}
@@ -483,13 +477,37 @@ public class HTable implements Table {
@Override
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
- batch(actions, results, -1);
+ int rpcTimeout = writeRpcTimeout;
+ boolean hasRead = false;
+ boolean hasWrite = false;
+ for (Row action : actions) {
+ if (action instanceof Mutation) {
+ hasWrite = true;
+ } else {
+ hasRead = true;
+ }
+ if (hasRead && hasWrite) {
+ break;
+ }
+ }
+ if (hasRead && !hasWrite) {
+ rpcTimeout = readRpcTimeout;
+ }
+ batch(actions, results, rpcTimeout);
}
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
throws InterruptedException, IOException {
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
- rpcTimeout);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(actions)
+ .setResults(results)
+ .setRpcTimeout(rpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -509,8 +527,20 @@ public class HTable implements Table {
public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[] results,
Callback<R> callback, ClusterConnection connection, ExecutorService pool, TableName tableName)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- AsyncRequestFuture ars = connection.getAsyncProcess().submitAll(
- pool, tableName, actions, callback, results);
+ int operationTimeout = connection.getConnectionConfiguration().getOperationTimeout();
+ int writeTimeout = connection.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ connection.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ AsyncProcessTask<R> task = AsyncProcessTask.newBuilder(callback)
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(actions)
+ .setResults(results)
+ .setOperationTimeout(operationTimeout)
+ .setRpcTimeout(writeTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -536,8 +566,16 @@ public class HTable implements Table {
}
};
List<Delete> rows = Collections.singletonList(delete);
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
- null, null, callable, writeRpcTimeout);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rows)
+ .setCallable(callable)
+ .setRpcTimeout(writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -615,8 +653,16 @@ public class HTable implements Table {
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
}
};
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
- null, null, callable, writeRpcTimeout);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rm.getMutations())
+ .setCallable(callable)
+ .setRpcTimeout(writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -795,8 +841,18 @@ public class HTable implements Table {
};
List<Delete> rows = Collections.singletonList(delete);
Object[] results = new Object[1];
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
- null, results, callable, -1);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rows)
+ .setCallable(callable)
+ // TODO any better timeout?
+ .setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout))
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .setResults(results)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -839,8 +895,18 @@ public class HTable implements Table {
* It is excessive to send such a large array, but that is required by the framework right now
* */
Object[] results = new Object[rm.getMutations().size()];
- AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
- null, results, callable, -1);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(rm.getMutations())
+ .setResults(results)
+ .setCallable(callable)
+ // TODO any better timeout?
+ .setRpcTimeout(Math.max(readRpcTimeout, writeRpcTimeout))
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -926,6 +992,10 @@ public class HTable implements Table {
return;
}
flushCommits();
+ if (mutator != null) {
+ mutator.close();
+ mutator = null;
+ }
if (cleanupPoolOnClose) {
this.pool.shutdown();
try {
@@ -939,11 +1009,6 @@ public class HTable implements Table {
LOG.warn("waitForTermination interrupted");
}
}
- if (cleanupConnectionOnClose) {
- if (this.connection != null) {
- this.connection.close();
- }
- }
this.closed = true;
}
@@ -1102,7 +1167,6 @@ public class HTable implements Table {
if (mutator != null) {
mutator.setOperationTimeout(operationTimeout);
}
- multiAp.setOperationTimeout(operationTimeout);
}
@Override
@@ -1134,7 +1198,6 @@ public class HTable implements Table {
if (mutator != null) {
mutator.setRpcTimeout(writeRpcTimeout);
}
- multiAp.setRpcTimeout(writeRpcTimeout);
}
@Override
@@ -1217,37 +1280,41 @@ public class HTable implements Table {
Object[] results = new Object[execs.size()];
AsyncProcess asyncProcess =
- new AsyncProcess(connection, configuration, pool,
+ new AsyncProcess(connection, configuration,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
- true, RpcControllerFactory.instantiate(configuration), readRpcTimeout,
- operationTimeout);
-
- AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
- new Callback<ClientProtos.CoprocessorServiceResult>() {
- @Override
- public void update(byte[] region, byte[] row,
- ClientProtos.CoprocessorServiceResult serviceResult) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
- ": region=" + Bytes.toStringBinary(region) +
- ", row=" + Bytes.toStringBinary(row) +
- ", value=" + serviceResult.getValue().getValue());
- }
- try {
- Message.Builder builder = responsePrototype.newBuilderForType();
- org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
- serviceResult.getValue().getValue().toByteArray());
- callback.update(region, row, (R) builder.build());
- } catch (IOException e) {
- LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
- e);
- callbackErrorExceptions.add(e);
- callbackErrorActions.add(execsByRow.get(row));
- callbackErrorServers.add("null");
- }
- }
- }, results);
-
+ true, RpcControllerFactory.instantiate(configuration));
+
+ Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
+ = (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
+ ": region=" + Bytes.toStringBinary(region) +
+ ", row=" + Bytes.toStringBinary(row) +
+ ", value=" + serviceResult.getValue().getValue());
+ }
+ try {
+ Message.Builder builder = responsePrototype.newBuilderForType();
+ org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
+ serviceResult.getValue().getValue().toByteArray());
+ callback.update(region, row, (R) builder.build());
+ } catch (IOException e) {
+ LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
+ e);
+ callbackErrorExceptions.add(e);
+ callbackErrorActions.add(execsByRow.get(row));
+ callbackErrorServers.add("null");
+ }
+ };
+ AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task = AsyncProcessTask.newBuilder(resultsCallback)
+ .setPool(pool)
+ .setTableName(tableName)
+ .setRowAccess(execs)
+ .setResults(results)
+ .setRpcTimeout(readRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture future = asyncProcess.submit(task);
future.waitUntilDone();
if (future.hasError()) {
@@ -1270,10 +1337,10 @@ public class HTable implements Table {
.pool(pool)
.writeBufferSize(connConfiguration.getWriteBufferSize())
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
+ .opertationTimeout(operationTimeout)
+ .rpcTimeout(writeRpcTimeout)
);
}
- mutator.setRpcTimeout(writeRpcTimeout);
- mutator.setOperationTimeout(operationTimeout);
return mutator;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 8ff64bf..c03b969 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -443,7 +443,7 @@ public class HTableMultiplexer {
private final AtomicInteger retryInQueue = new AtomicInteger(0);
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
private final int operationTimeout;
-
+ private final ExecutorService pool;
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
ExecutorService pool, ScheduledExecutorService executor) {
@@ -457,10 +457,10 @@ public class HTableMultiplexer {
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
- writeRpcTimeout, operationTimeout);
+ this.ap = new AsyncProcess(conn, conf, rpcCallerFactory, false, rpcControllerFactory);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
+ this.pool = pool;
}
protected LinkedBlockingQueue<PutStatus> getQueue() {
@@ -594,9 +594,14 @@ public class HTableMultiplexer {
Map<ServerName, MultiAction> actionsByServer =
Collections.singletonMap(server, actions);
try {
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setResults(results)
+ .setPool(pool)
+ .setRpcTimeout(writeRpcTimeout)
+ .setOperationTimeout(operationTimeout)
+ .build();
AsyncRequestFuture arf =
- ap.submitMultiActions(null, retainedActions, 0L, null, results, true, null,
- null, actionsByServer, null);
+ ap.submitMultiActions(task, retainedActions, 0L, null, null, actionsByServer);
arf.waitUntilDone();
if (arf.hasError()) {
// We just log and ignore the exception here since failed Puts will be resubmit again.