You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/24 04:10:46 UTC
[2/3] hbase git commit: HBASE-17174 Refactor the AsyncProcess,
BufferedMutatorImpl, and HTable
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java
new file mode 100644
index 0000000..7e9c968
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * An interface for client request scheduling algorithm.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface RequestController {
+
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public enum ReturnCode {
+ /**
+ * Accept current row.
+ */
+ INCLUDE,
+ /**
+ * Skip current row.
+ */
+ SKIP,
+ /**
+ * No more row can be included.
+ */
+ END
+ }
+
+ /**
+ * Picks up the valid data.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public interface Checker {
+ /**
+ * Checks the data whether it is valid to submit.
+ * @param loc the destination of data
+ * @param row the data to check
+ * @return describe the decision for the row
+ */
+ ReturnCode canTakeRow(HRegionLocation loc, Row row);
+
+ /**
+ * Reset the state of the scheduler when completing the iteration of rows.
+ * @throws InterruptedIOException some controller may wait
+ * for some busy region or RS to complete the undealt request.
+ */
+ void reset() throws InterruptedIOException ;
+ }
+
+ /**
+ * @return A new checker for evaluating a batch rows.
+ */
+ Checker newChecker();
+
+ /**
+ * Increment the counter if we build a valid task.
+ * @param regions The destination of task
+ * @param sn The target server
+ */
+ void incTaskCounters(Collection<byte[]> regions, ServerName sn);
+
+ /**
+ * Decrement the counter if a task is accomplished.
+ * @param regions The destination of task
+ * @param sn The target server
+ */
+ void decTaskCounters(Collection<byte[]> regions, ServerName sn);
+
+ /**
+ * @return The number of running task.
+ */
+ long getNumberOfTsksInProgress();
+
+ /**
+ * Waits for the running tasks to complete.
+ * If there are specified threshold and trigger, the implementation should
+ * wake up once in a while for checking the threshold and calling trigger.
+ * @param max This method will return if the number of running tasks is
+ * less than or equal to max.
+ * @param id the caller's id
+ * @param periodToTrigger The period to invoke the trigger. This value is a
+ * hint. The real period depends on the implementation.
+ * @param trigger The object to call periodically.
+ * @throws java.io.InterruptedIOException If the waiting is interrupted
+ */
+ void waitForMaximumCurrentTasks(long max, long id,
+ int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException;
+
+ /**
+ * Wait until there is at least one slot for a new task.
+ * @param id the caller's id
+ * @param periodToTrigger The period to invoke the trigger. This value is a
+ * hint. The real period depends on the implementation.
+ * @param trigger The object to call periodically.
+ * @throws java.io.InterruptedIOException If the waiting is interrupted
+ */
+ void waitForFreeSlot(long id, int periodToTrigger,
+ Consumer<Long> trigger) throws InterruptedIOException;
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java
new file mode 100644
index 0000000..7ed80f0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestControllerFactory.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * A factory class that constructs an {@link org.apache.hadoop.hbase.client.RequestController}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class RequestControllerFactory {
+ public static final String REQUEST_CONTROLLER_IMPL_CONF_KEY = "hbase.client.request.controller.impl";
+ /**
+ * Constructs a {@link org.apache.hadoop.hbase.client.RequestController}.
+ * @param conf The {@link Configuration} to use.
+ * @return A RequestController which is built according to the configuration.
+ */
+ public static RequestController create(Configuration conf) {
+ Class<? extends RequestController> clazz= conf.getClass(REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ SimpleRequestController.class, RequestController.class);
+ return ReflectionUtils.newInstance(clazz, conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
index 788f1a4..85fd590 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowAccess.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-@VisibleForTesting
-interface RowAccess<T> extends Iterable<T> {
+public interface RowAccess<T> extends Iterable<T> {
/**
* @return true if there are no elements.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java
new file mode 100644
index 0000000..473f264
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java
@@ -0,0 +1,519 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Holds back the request if the submitted size or number has reached the
+ * threshold.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class SimpleRequestController implements RequestController {
+ private static final Log LOG = LogFactory.getLog(SimpleRequestController.class);
+ /**
+ * The maximum size of single RegionServer.
+ */
+ public static final String HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = "hbase.client.max.perrequest.heapsize";
+
+ /**
+ * Default value of #HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE
+ */
+ @VisibleForTesting
+ static final long DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE = 4194304;
+
+ /**
+ * The maximum size of submit.
+ */
+ public static final String HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = "hbase.client.max.submit.heapsize";
+ /**
+ * Default value of #HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE
+ */
+ @VisibleForTesting
+ static final long DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE = DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE;
+ @VisibleForTesting
+ final AtomicLong tasksInProgress = new AtomicLong(0);
+ @VisibleForTesting
+ final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion
+ = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
+ @VisibleForTesting
+ final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer = new ConcurrentHashMap<>();
+ /**
+ * The number of tasks simultaneously executed on the cluster.
+ */
+ private final int maxTotalConcurrentTasks;
+
+ /**
+ * The max heap size of all tasks simultaneously executed on a server.
+ */
+ private final long maxHeapSizePerRequest;
+ private final long maxHeapSizeSubmit;
+ /**
+ * The number of tasks we run in parallel on a single region. With 1 (the
+ * default) , we ensure that the ordering of the queries is respected: we
+ * don't start a set of operations on a region before the previous one is
+ * done. As well, this limits the pressure we put on the region server.
+ */
+ @VisibleForTesting
+ final int maxConcurrentTasksPerRegion;
+
+ /**
+ * The number of task simultaneously executed on a single region server.
+ */
+ @VisibleForTesting
+ final int maxConcurrentTasksPerServer;
+ private final int thresholdToLogUndoneTaskDetails;
+ public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
+ "hbase.client.threshold.log.details";
+ private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
+ public static final String THRESHOLD_TO_LOG_REGION_DETAILS =
+ "hbase.client.threshold.log.region.details";
+ private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2;
+ private final int thresholdToLogRegionDetails;
+ SimpleRequestController(final Configuration conf) {
+ this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
+ this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
+ this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
+ this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+ DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
+ this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
+ this.thresholdToLogUndoneTaskDetails =
+ conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+ DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
+ this.thresholdToLogRegionDetails =
+ conf.getInt(THRESHOLD_TO_LOG_REGION_DETAILS,
+ DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS);
+ if (this.maxTotalConcurrentTasks <= 0) {
+ throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
+ }
+ if (this.maxConcurrentTasksPerServer <= 0) {
+ throw new IllegalArgumentException("maxConcurrentTasksPerServer="
+ + maxConcurrentTasksPerServer);
+ }
+ if (this.maxConcurrentTasksPerRegion <= 0) {
+ throw new IllegalArgumentException("maxConcurrentTasksPerRegion="
+ + maxConcurrentTasksPerRegion);
+ }
+ if (this.maxHeapSizePerRequest <= 0) {
+ throw new IllegalArgumentException("maxHeapSizePerServer="
+ + maxHeapSizePerRequest);
+ }
+
+ if (this.maxHeapSizeSubmit <= 0) {
+ throw new IllegalArgumentException("maxHeapSizeSubmit="
+ + maxHeapSizeSubmit);
+ }
+ }
+
+ @VisibleForTesting
+ static Checker newChecker(List<RowChecker> checkers) {
+ return new Checker() {
+ private boolean isEnd = false;
+
+ @Override
+ public ReturnCode canTakeRow(HRegionLocation loc, Row row) {
+ if (isEnd) {
+ return ReturnCode.END;
+ }
+ long rowSize = (row instanceof Mutation) ? ((Mutation) row).heapSize() : 0;
+ ReturnCode code = ReturnCode.INCLUDE;
+ for (RowChecker checker : checkers) {
+ switch (checker.canTakeOperation(loc, rowSize)) {
+ case END:
+ isEnd = true;
+ code = ReturnCode.END;
+ break;
+ case SKIP:
+ code = ReturnCode.SKIP;
+ break;
+ case INCLUDE:
+ default:
+ break;
+ }
+ if (code == ReturnCode.END) {
+ break;
+ }
+ }
+ for (RowChecker checker : checkers) {
+ checker.notifyFinal(code, loc, rowSize);
+ }
+ return code;
+ }
+
+ @Override
+ public void reset() throws InterruptedIOException {
+ isEnd = false;
+ InterruptedIOException e = null;
+ for (RowChecker checker : checkers) {
+ try {
+ checker.reset();
+ } catch (InterruptedIOException ex) {
+ e = ex;
+ }
+ }
+ if (e != null) {
+ throw e;
+ }
+ }
+ };
+ }
+
+ @Override
+ public Checker newChecker() {
+ List<RowChecker> checkers = new ArrayList<>(3);
+ checkers.add(new TaskCountChecker(maxTotalConcurrentTasks,
+ maxConcurrentTasksPerServer,
+ maxConcurrentTasksPerRegion,
+ tasksInProgress,
+ taskCounterPerServer,
+ taskCounterPerRegion));
+ checkers.add(new RequestSizeChecker(maxHeapSizePerRequest));
+ checkers.add(new SubmittedSizeChecker(maxHeapSizeSubmit));
+ return newChecker(checkers);
+ }
+
+ @Override
+ public void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
+ tasksInProgress.incrementAndGet();
+
+ computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet();
+
+ regions.forEach((regBytes)
+ -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet()
+ );
+ }
+
+ @Override
+ public void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
+ regions.forEach(regBytes -> {
+ AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
+ regionCnt.decrementAndGet();
+ });
+
+ taskCounterPerServer.get(sn).decrementAndGet();
+ tasksInProgress.decrementAndGet();
+ synchronized (tasksInProgress) {
+ tasksInProgress.notifyAll();
+ }
+ }
+
+ @Override
+ public long getNumberOfTsksInProgress() {
+ return tasksInProgress.get();
+ }
+
+ @Override
+ public void waitForMaximumCurrentTasks(long max, long id,
+ int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
+ assert max >= 0;
+ long lastLog = EnvironmentEdgeManager.currentTime();
+ long currentInProgress, oldInProgress = Long.MAX_VALUE;
+ while ((currentInProgress = tasksInProgress.get()) > max) {
+ if (oldInProgress != currentInProgress) { // Wait for in progress to change.
+ long now = EnvironmentEdgeManager.currentTime();
+ if (now > lastLog + periodToTrigger) {
+ lastLog = now;
+ if (trigger != null) {
+ trigger.accept(currentInProgress);
+ }
+ logDetailsOfUndoneTasks(currentInProgress);
+ }
+ }
+ oldInProgress = currentInProgress;
+ try {
+ synchronized (tasksInProgress) {
+ if (tasksInProgress.get() == oldInProgress) {
+ tasksInProgress.wait(10);
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("#" + id + ", interrupted." +
+ " currentNumberOfTask=" + currentInProgress);
+ }
+ }
+ }
+
+ private void logDetailsOfUndoneTasks(long taskInProgress) {
+ if (taskInProgress <= thresholdToLogUndoneTaskDetails) {
+ ArrayList<ServerName> servers = new ArrayList<>();
+ for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ servers.add(entry.getKey());
+ }
+ }
+ LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
+ }
+
+ if (taskInProgress <= thresholdToLogRegionDetails) {
+ ArrayList<String> regions = new ArrayList<>();
+ for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ regions.add(Bytes.toString(entry.getKey()));
+ }
+ }
+ LOG.info("Regions against which left over task(s) are processed: " + regions);
+ }
+ }
+
+ @Override
+ public void waitForFreeSlot(long id, int periodToTrigger, Consumer<Long> trigger) throws InterruptedIOException {
+ waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger);
+ }
+
+ /**
+ * limit the heapsize of total submitted data. Reduce the limit of heapsize
+ * for submitting quickly if there is no running task.
+ */
+ @VisibleForTesting
+ static class SubmittedSizeChecker implements RowChecker {
+
+ private final long maxHeapSizeSubmit;
+ private long heapSize = 0;
+
+ SubmittedSizeChecker(final long maxHeapSizeSubmit) {
+ this.maxHeapSizeSubmit = maxHeapSizeSubmit;
+ }
+
+ @Override
+ public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+ if (heapSize >= maxHeapSizeSubmit) {
+ return ReturnCode.END;
+ }
+ return ReturnCode.INCLUDE;
+ }
+
+ @Override
+ public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
+ if (code == ReturnCode.INCLUDE) {
+ heapSize += rowSize;
+ }
+ }
+
+ @Override
+ public void reset() {
+ heapSize = 0;
+ }
+ }
+
+ /**
+ * limit the max number of tasks in an AsyncProcess.
+ */
+ @VisibleForTesting
+ static class TaskCountChecker implements RowChecker {
+
+ private static final long MAX_WAITING_TIME = 1000; //ms
+ private final Set<HRegionInfo> regionsIncluded = new HashSet<>();
+ private final Set<ServerName> serversIncluded = new HashSet<>();
+ private final int maxConcurrentTasksPerRegion;
+ private final int maxTotalConcurrentTasks;
+ private final int maxConcurrentTasksPerServer;
+ private final Map<byte[], AtomicInteger> taskCounterPerRegion;
+ private final Map<ServerName, AtomicInteger> taskCounterPerServer;
+ private final Set<byte[]> busyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ private final AtomicLong tasksInProgress;
+
+ TaskCountChecker(final int maxTotalConcurrentTasks,
+ final int maxConcurrentTasksPerServer,
+ final int maxConcurrentTasksPerRegion,
+ final AtomicLong tasksInProgress,
+ final Map<ServerName, AtomicInteger> taskCounterPerServer,
+ final Map<byte[], AtomicInteger> taskCounterPerRegion) {
+ this.maxTotalConcurrentTasks = maxTotalConcurrentTasks;
+ this.maxConcurrentTasksPerRegion = maxConcurrentTasksPerRegion;
+ this.maxConcurrentTasksPerServer = maxConcurrentTasksPerServer;
+ this.taskCounterPerRegion = taskCounterPerRegion;
+ this.taskCounterPerServer = taskCounterPerServer;
+ this.tasksInProgress = tasksInProgress;
+ }
+
+ @Override
+ public void reset() throws InterruptedIOException {
+ // prevent the busy-waiting
+ waitForRegion();
+ regionsIncluded.clear();
+ serversIncluded.clear();
+ busyRegions.clear();
+ }
+
+ private void waitForRegion() throws InterruptedIOException {
+ if (busyRegions.isEmpty()) {
+ return;
+ }
+ EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+ final long start = ee.currentTime();
+ while ((ee.currentTime() - start) <= MAX_WAITING_TIME) {
+ for (byte[] region : busyRegions) {
+ AtomicInteger count = taskCounterPerRegion.get(region);
+ if (count == null || count.get() < maxConcurrentTasksPerRegion) {
+ return;
+ }
+ }
+ try {
+ synchronized (tasksInProgress) {
+ tasksInProgress.wait(10);
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted."
+ + " tasksInProgress=" + tasksInProgress);
+ }
+ }
+ }
+
+ /**
+ * 1) check the regions is allowed. 2) check the concurrent tasks for
+ * regions. 3) check the total concurrent tasks. 4) check the concurrent
+ * tasks for server.
+ *
+ * @param loc
+ * @param rowSize
+ * @return
+ */
+ @Override
+ public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+
+ HRegionInfo regionInfo = loc.getRegionInfo();
+ if (regionsIncluded.contains(regionInfo)) {
+ // We already know what to do with this region.
+ return ReturnCode.INCLUDE;
+ }
+ AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
+ if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
+ // Too many tasks on this region already.
+ return ReturnCode.SKIP;
+ }
+ int newServers = serversIncluded.size()
+ + (serversIncluded.contains(loc.getServerName()) ? 0 : 1);
+ if ((newServers + tasksInProgress.get()) > maxTotalConcurrentTasks) {
+ // Too many tasks.
+ return ReturnCode.SKIP;
+ }
+ AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
+ if (serverCnt != null && serverCnt.get() >= maxConcurrentTasksPerServer) {
+ // Too many tasks for this individual server
+ return ReturnCode.SKIP;
+ }
+ return ReturnCode.INCLUDE;
+ }
+
+ @Override
+ public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
+ if (code == ReturnCode.INCLUDE) {
+ regionsIncluded.add(loc.getRegionInfo());
+ serversIncluded.add(loc.getServerName());
+ }
+ busyRegions.add(loc.getRegionInfo().getRegionName());
+ }
+ }
+
+ /**
+ * limit the request size for each regionserver.
+ */
+ @VisibleForTesting
+ static class RequestSizeChecker implements RowChecker {
+
+ private final long maxHeapSizePerRequest;
+ private final Map<ServerName, Long> serverRequestSizes = new HashMap<>();
+
+ RequestSizeChecker(final long maxHeapSizePerRequest) {
+ this.maxHeapSizePerRequest = maxHeapSizePerRequest;
+ }
+
+ @Override
+ public void reset() {
+ serverRequestSizes.clear();
+ }
+
+ @Override
+ public ReturnCode canTakeOperation(HRegionLocation loc, long rowSize) {
+ // Is it ok for limit of request size?
+ long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
+ ? serverRequestSizes.get(loc.getServerName()) : 0L;
+ // accept at least one request
+ if (currentRequestSize == 0 || currentRequestSize + rowSize <= maxHeapSizePerRequest) {
+ return ReturnCode.INCLUDE;
+ }
+ return ReturnCode.SKIP;
+ }
+
+ @Override
+ public void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize) {
+ if (code == ReturnCode.INCLUDE) {
+ long currentRequestSize = serverRequestSizes.containsKey(loc.getServerName())
+ ? serverRequestSizes.get(loc.getServerName()) : 0L;
+ serverRequestSizes.put(loc.getServerName(), currentRequestSize + rowSize);
+ }
+ }
+ }
+
+ /**
+ * Provide a way to control the flow of rows iteration.
+ */
+ @VisibleForTesting
+ interface RowChecker {
+
+ ReturnCode canTakeOperation(HRegionLocation loc, long rowSize);
+
+ /**
+ * Add the final ReturnCode to the checker. The ReturnCode may be reversed,
+ * so the checker need the final decision to update the inner state.
+ *
+ * @param code The final decision
+ * @param loc the destination of data
+ * @param rowSize the data size
+ */
+ void notifyFinal(ReturnCode code, HRegionLocation loc, long rowSize);
+
+ /**
+ * Reset the inner state.
+ */
+ void reset() throws InterruptedIOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8cb55c40/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index bb6cbb5..ed7202a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -33,12 +33,10 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -59,15 +57,8 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncProcess.ListRowAccess;
-import org.apache.hadoop.hbase.client.AsyncProcess.TaskCountChecker;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker.ReturnCode;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowCheckerHost;
-import org.apache.hadoop.hbase.client.AsyncProcess.RequestSizeChecker;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.ListRowAccess;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -78,60 +69,64 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
-import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker;
-import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
+import org.apache.hadoop.hbase.client.AsyncProcessTask.SubmittedRows;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+
@Category({ClientTests.class, MediumTests.class})
public class TestAsyncProcess {
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
withLookingForStuckThread(true).build();
- private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class);
+ private static final Log LOG = LogFactory.getLog(TestAsyncProcess.class);
private static final TableName DUMMY_TABLE =
TableName.valueOf("DUMMY_TABLE");
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
private static final byte[] FAILS = "FAILS".getBytes();
- private static final Configuration conf = new Configuration();
-
- private static ServerName sn = ServerName.valueOf("s1:1,1");
- private static ServerName sn2 = ServerName.valueOf("s2:2,2");
- private static ServerName sn3 = ServerName.valueOf("s3:3,3");
- private static HRegionInfo hri1 =
+ private static final Configuration CONF = new Configuration();
+ private static final ConnectionConfiguration CONNECTION_CONFIG = new ConnectionConfiguration(CONF);
+ private static final ServerName sn = ServerName.valueOf("s1:1,1");
+ private static final ServerName sn2 = ServerName.valueOf("s2:2,2");
+ private static final ServerName sn3 = ServerName.valueOf("s3:3,3");
+ private static final HRegionInfo hri1 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
- private static HRegionInfo hri2 =
+ private static final HRegionInfo hri2 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
- private static HRegionInfo hri3 =
+ private static final HRegionInfo hri3 =
new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
- private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
- private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
- private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
+ private static final HRegionLocation loc1 = new HRegionLocation(hri1, sn);
+ private static final HRegionLocation loc2 = new HRegionLocation(hri2, sn);
+ private static final HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
// Replica stuff
- private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
+ private static final HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
- private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
- private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
+ private static final HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
+ private static final RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
- private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
+ private static final RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
new HRegionLocation(hri2r1, sn3));
- private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
+ private static final RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
private static final String success = "success";
private static Exception failure = new Exception("failure");
- private static int NB_RETRIES = 3;
+ private static final int NB_RETRIES = 3;
+ private static final int RPC_TIMEOUT = CONF.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ private static final int OPERATION_TIMEOUT = CONF.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
@BeforeClass
public static void beforeClass(){
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
+ CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
}
static class CountingThreadFactory implements ThreadFactory {
@@ -153,20 +148,21 @@ public class TestAsyncProcess {
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
- private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+
private long previousTimeout = -1;
+ final ExecutorService service;
@Override
- protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
- List<Action> actions, long nonceGroup, ExecutorService pool,
- Batch.Callback<Res> callback, Object[] results, boolean needResults,
- CancellableRegionServerCallable callable, int curTimeout) {
+ protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(
+ AsyncProcessTask task, List<Action> actions, long nonceGroup) {
// Test HTable has tableName of null, so pass DUMMY_TABLE
+ AsyncProcessTask wrap = new AsyncProcessTask(task){
+ @Override
+ public TableName getTableName() {
+ return DUMMY_TABLE;
+ }
+ };
AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>(
- DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults,
- results, callback, callable, operationTimeout, rpcTimeout, this);
+ wrap, actions, nonceGroup, this);
allReqs.add(r);
return r;
}
@@ -176,49 +172,54 @@ public class TestAsyncProcess {
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
- super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
- new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout,
- operationTimeout);
+ super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+ service = new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new CountingThreadFactory(nbThreads));
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
- super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
- rpcTimeout, operationTimeout);
+ super(hc, conf,
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+ service = Executors.newFixedThreadPool(5);
}
- public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
- @SuppressWarnings("unused") boolean dummy) {
- super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
- @Override
- public void execute(Runnable command) {
- throw new RejectedExecutionException("test under failure");
- }
- },
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
- rpcTimeout, operationTimeout);
+ public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
+ List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
+ boolean needResults) throws InterruptedIOException {
+ AsyncProcessTask task = AsyncProcessTask.newBuilder(callback)
+ .setPool(pool == null ? service : pool)
+ .setTableName(tableName)
+ .setRowAccess(rows)
+ .setSubmittedRows(atLeastOne ? SubmittedRows.AT_LEAST_ONE : SubmittedRows.NORMAL)
+ .setNeedResults(needResults)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .build();
+ return submit(task);
+ }
+
+ public <CResult> AsyncRequestFuture submit(TableName tableName,
+ final List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
+ boolean needResults) throws InterruptedIOException {
+ return submit(null, tableName, rows, atLeastOne, callback, needResults);
}
@Override
- public <Res> AsyncRequestFuture submit(TableName tableName, RowAccess<? extends Row> rows,
- boolean atLeastOne, Callback<Res> callback, boolean needResults)
+ public <Res> AsyncRequestFuture submit(AsyncProcessTask<Res> task)
throws InterruptedIOException {
+ previousTimeout = task.getRpcTimeout();
// We use results in tests to check things, so override to always save them.
- return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
+ AsyncProcessTask<Res> wrap = new AsyncProcessTask<Res>(task) {
+ @Override
+ public boolean getNeedResults() {
+ return true;
+ }
+ };
+ return super.submit(wrap);
}
@Override
- public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
- List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
- CancellableRegionServerCallable callable, int curTimeout) {
- previousTimeout = curTimeout;
- return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
- }
- @Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
@@ -260,12 +261,9 @@ public class TestAsyncProcess {
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
- public MyAsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup,
- ExecutorService pool, boolean needResults, Object[] results,
- Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout,
- int rpcTimeout, AsyncProcess asyncProcess) {
- super(tableName, actions, nonceGroup, pool, needResults,
- results, callback, callable, operationTimeout, rpcTimeout, asyncProcess);
+ public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
+ long nonceGroup, AsyncProcess asyncProcess) {
+ super(task, actions, nonceGroup, asyncProcess);
}
@Override
@@ -483,7 +481,7 @@ public class TestAsyncProcess {
final boolean usedRegions[];
protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException {
- super(conf);
+ super(CONF);
this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()];
}
@@ -553,19 +551,7 @@ public class TestAsyncProcess {
long putsHeapSize = writeBuffer;
doSubmitRequest(writeBuffer, putsHeapSize);
}
- @Test
- public void testIllegalArgument() throws IOException {
- ClusterConnection conn = createHConnection();
- final long maxHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
- AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
- conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, -1);
- try {
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
- fail("The maxHeapSizePerRequest must be bigger than zero");
- } catch (IllegalArgumentException e) {
- }
- conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
- }
+
@Test
public void testSubmitLargeRequestWithUnlimitedSize() throws Exception {
long maxHeapSizePerRequest = Long.MAX_VALUE;
@@ -601,10 +587,13 @@ public class TestAsyncProcess {
private void doSubmitRequest(long maxHeapSizePerRequest, long putsHeapSize) throws Exception {
ClusterConnection conn = createHConnection();
- final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
- AsyncProcess.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
- conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
- BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+ final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+ final long defaultHeapSizePerRequest = conn.getConfiguration().getLong(
+ SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+ SimpleRequestController.DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ SimpleRequestController.class.getName());
+ conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, maxHeapSizePerRequest);
// sn has two regions
long putSizeSN = 0;
@@ -630,11 +619,12 @@ public class TestAsyncProcess {
+ ", maxHeapSizePerRequest:" + maxHeapSizePerRequest
+ ", minCountSnRequest:" + minCountSnRequest
+ ", minCountSn2Request:" + minCountSn2Request);
- try (HTable ht = new HTable(conn, bufferParam)) {
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
- ht.mutator.ap = ap;
- Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+ try (HTable ht = new HTable(conn, mutator)) {
+ Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
ht.put(puts);
List<AsyncRequestFuture> reqs = ap.allReqs;
@@ -680,12 +670,17 @@ public class TestAsyncProcess {
assertEquals(putSizeSN2, (long) sizePerServers.get(sn2));
}
// restore config.
- conn.getConfiguration().setLong(AsyncProcess.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest);
+ conn.getConfiguration().setLong(SimpleRequestController.HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, defaultHeapSizePerRequest);
+ if (defaultClazz != null) {
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ defaultClazz);
+ }
}
+
@Test
public void testSubmit() throws Exception {
ClusterConnection hc = createHConnection();
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
@@ -704,7 +699,7 @@ public class TestAsyncProcess {
updateCalled.incrementAndGet();
}
};
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
@@ -717,13 +712,16 @@ public class TestAsyncProcess {
@Test
public void testSubmitBusyRegion() throws Exception {
- ClusterConnection hc = createHConnection();
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
-
+ ClusterConnection conn = createHConnection();
+ final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ SimpleRequestController.class.getName());
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
+ SimpleRequestController controller = (SimpleRequestController) ap.requestController;
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
- for (int i = 0; i != ap.maxConcurrentTasksPerRegion; ++i) {
+ for (int i = 0; i != controller.maxConcurrentTasksPerRegion; ++i) {
ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
}
ap.submit(null, DUMMY_TABLE, puts, false, null, false);
@@ -732,15 +730,22 @@ public class TestAsyncProcess {
ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(0, puts.size());
+ if (defaultClazz != null) {
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ defaultClazz);
+ }
}
@Test
public void testSubmitBusyRegionServer() throws Exception {
- ClusterConnection hc = createHConnection();
- AsyncProcess ap = new MyAsyncProcess(hc, conf);
-
- ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
+ ClusterConnection conn = createHConnection();
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF);
+ final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ SimpleRequestController.class.getName());
+ SimpleRequestController controller = (SimpleRequestController) ap.requestController;
+ controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer));
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
@@ -751,14 +756,18 @@ public class TestAsyncProcess {
ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertEquals(" puts=" + puts, 1, puts.size());
- ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
+ controller.taskCounterPerServer.put(sn2, new AtomicInteger(controller.maxConcurrentTasksPerServer - 1));
ap.submit(null, DUMMY_TABLE, puts, false, null, false);
Assert.assertTrue(puts.isEmpty());
+ if (defaultClazz != null) {
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ defaultClazz);
+ }
}
@Test
public void testFail() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
List<Put> puts = new ArrayList<Put>();
Put p = createPut(1, false);
@@ -784,10 +793,15 @@ public class TestAsyncProcess {
@Test
public void testSubmitTrue() throws IOException {
- final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
- ap.tasksInProgress.incrementAndGet();
- final AtomicInteger ai = new AtomicInteger(ap.maxConcurrentTasksPerRegion);
- ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
+ ClusterConnection conn = createHConnection();
+ final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+ final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ SimpleRequestController.class.getName());
+ SimpleRequestController controller = (SimpleRequestController) ap.requestController;
+ controller.tasksInProgress.incrementAndGet();
+ final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion);
+ controller.taskCounterPerRegion.put(hri1.getRegionName(), ai);
final AtomicBoolean checkPoint = new AtomicBoolean(false);
final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
@@ -798,7 +812,7 @@ public class TestAsyncProcess {
Threads.sleep(1000);
Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent
ai.decrementAndGet();
- ap.tasksInProgress.decrementAndGet();
+ controller.tasksInProgress.decrementAndGet();
checkPoint2.set(true);
}
};
@@ -819,11 +833,15 @@ public class TestAsyncProcess {
while (!checkPoint2.get()){
Threads.sleep(1);
}
+ if (defaultClazz != null) {
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ defaultClazz);
+ }
}
@Test
public void testFailAndSuccess() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
@@ -850,7 +868,7 @@ public class TestAsyncProcess {
@Test
public void testFlush() throws Exception {
- MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
@@ -868,24 +886,32 @@ public class TestAsyncProcess {
@Test
public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException {
ClusterConnection hc = createHConnection();
- MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false);
+ MyAsyncProcess ap = new MyAsyncProcess(hc, CONF, false);
testTaskCount(ap);
}
@Test
public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException {
- Configuration copyConf = new Configuration(conf);
+ Configuration copyConf = new Configuration(CONF);
copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
MyClientBackoffPolicy bp = new MyClientBackoffPolicy();
- ClusterConnection hc = createHConnection();
- Mockito.when(hc.getConfiguration()).thenReturn(copyConf);
- Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
- Mockito.when(hc.getBackoffPolicy()).thenReturn(bp);
- MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false);
+ ClusterConnection conn = createHConnection();
+ Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
+ Mockito.when(conn.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf));
+ Mockito.when(conn.getBackoffPolicy()).thenReturn(bp);
+ final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ SimpleRequestController.class.getName());
+ MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, false);
testTaskCount(ap);
+ if (defaultClazz != null) {
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ defaultClazz);
+ }
}
- private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException {
+ private void testTaskCount(MyAsyncProcess ap) throws InterruptedIOException, InterruptedException {
+ SimpleRequestController controller = (SimpleRequestController) ap.requestController;
List<Put> puts = new ArrayList<>();
for (int i = 0; i != 3; ++i) {
puts.add(createPut(1, true));
@@ -896,18 +922,24 @@ public class TestAsyncProcess {
ap.waitForMaximumCurrentTasks(0, null);
// More time to wait if there are incorrect task count.
TimeUnit.SECONDS.sleep(1);
- assertEquals(0, ap.tasksInProgress.get());
- for (AtomicInteger count : ap.taskCounterPerRegion.values()) {
+ assertEquals(0, controller.tasksInProgress.get());
+ for (AtomicInteger count : controller.taskCounterPerRegion.values()) {
assertEquals(0, count.get());
}
- for (AtomicInteger count : ap.taskCounterPerServer.values()) {
+ for (AtomicInteger count : controller.taskCounterPerServer.values()) {
assertEquals(0, count.get());
}
}
@Test
public void testMaxTask() throws Exception {
- final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
+ ClusterConnection conn = createHConnection();
+ final String defaultClazz = conn.getConfiguration().get(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY);
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ SimpleRequestController.class.getName());
+ final MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, false);
+ SimpleRequestController controller = (SimpleRequestController) ap.requestController;
+
for (int i = 0; i < 1000; i++) {
ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
@@ -940,7 +972,7 @@ public class TestAsyncProcess {
@Override
public void run() {
Threads.sleep(sleepTime);
- while (ap.tasksInProgress.get() > 0) {
+ while (controller.tasksInProgress.get() > 0) {
ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
}
}
@@ -953,6 +985,10 @@ public class TestAsyncProcess {
//Adds 100 to secure us against approximate timing.
Assert.assertTrue(start + 100L + sleepTime > end);
+ if (defaultClazz != null) {
+ conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY,
+ defaultClazz);
+ }
}
private static ClusterConnection createHConnection() throws IOException {
@@ -999,38 +1035,53 @@ public class TestAsyncProcess {
NonceGenerator ng = Mockito.mock(NonceGenerator.class);
Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
- Mockito.when(hc.getConfiguration()).thenReturn(conf);
+ Mockito.when(hc.getConfiguration()).thenReturn(CONF);
+ Mockito.when(hc.getConnectionConfiguration()).thenReturn(CONNECTION_CONFIG);
return hc;
}
@Test
public void testHTablePutSuccess() throws Exception {
- BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
- ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
+ ClusterConnection conn = createHConnection();
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl ht = new BufferedMutatorImpl(conn, bufferParam, ap);
Put put = createPut(1, true);
- Assert.assertEquals(0, ht.getWriteBufferSize());
+ Assert.assertEquals(conn.getConnectionConfiguration().getWriteBufferSize(), ht.getWriteBufferSize());
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
ht.mutate(put);
- Assert.assertEquals(0, ht.getWriteBufferSize());
+ ht.flush();
+ Assert.assertEquals(0, ht.getCurrentWriteBufferSize());
+ }
+
+ @Test
+ public void testBufferedMutatorImplWithSharedPool() throws Exception {
+ ClusterConnection conn = createHConnection();
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutator ht = new BufferedMutatorImpl(conn, bufferParam, ap);
+
+ ht.close();
+ assertFalse(ap.service.isShutdown());
}
private void doHTableFailedPut(boolean bufferOn) throws Exception {
ClusterConnection conn = createHConnection();
- BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
if (bufferOn) {
bufferParam.writeBufferSize(1024L * 1024L);
} else {
bufferParam.writeBufferSize(0L);
}
-
- HTable ht = new HTable(conn, bufferParam);
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
- ht.mutator.ap = ap;
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+ HTable ht = new HTable(conn, mutator);
Put put = createPut(1, false);
- Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+ Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
try {
ht.put(put);
if (bufferOn) {
@@ -1039,7 +1090,7 @@ public class TestAsyncProcess {
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
+ Assert.assertEquals(0L, ht.mutator.getCurrentWriteBufferSize());
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
@@ -1067,10 +1118,10 @@ public class TestAsyncProcess {
@Test
public void testHTableFailedPutAndNewPut() throws Exception {
ClusterConnection conn = createHConnection();
- BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
- new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
- MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
- mutator.ap = ap;
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE)
+ .writeBufferSize(0);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
Put p = createPut(1, false);
mutator.mutate(p);
@@ -1083,202 +1134,13 @@ public class TestAsyncProcess {
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
// manage what was inserted, what was tried but failed, and what was not even tried.
p = createPut(1, true);
- Assert.assertEquals(0, mutator.writeAsyncBuffer.size());
+ Assert.assertEquals(0, mutator.size());
try {
mutator.mutate(p);
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size());
- }
-
- @Test
- public void testTaskCheckerHost() throws IOException {
- final int maxTotalConcurrentTasks = 100;
- final int maxConcurrentTasksPerServer = 2;
- final int maxConcurrentTasksPerRegion = 1;
- final AtomicLong tasksInProgress = new AtomicLong(0);
- final Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
- final Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
- TaskCountChecker countChecker = new TaskCountChecker(
- maxTotalConcurrentTasks,
- maxConcurrentTasksPerServer,
- maxConcurrentTasksPerRegion,
- tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
- final long maxHeapSizePerRequest = 2 * 1024 * 1024;
- // unlimiited
- RequestSizeChecker sizeChecker = new RequestSizeChecker(maxHeapSizePerRequest);
- RowCheckerHost checkerHost = new RowCheckerHost(Arrays.asList(countChecker, sizeChecker));
-
- ReturnCode loc1Code = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest);
- assertEquals(RowChecker.ReturnCode.INCLUDE, loc1Code);
-
- ReturnCode loc1Code_2 = checkerHost.canTakeOperation(loc1, maxHeapSizePerRequest);
- // rejected for size
- assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc1Code_2);
-
- ReturnCode loc2Code = checkerHost.canTakeOperation(loc2, maxHeapSizePerRequest);
- // rejected for size
- assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc2Code);
-
- // fill the task slots for loc3.
- taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100));
- taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
-
- ReturnCode loc3Code = checkerHost.canTakeOperation(loc3, 1L);
- // rejected for count
- assertNotEquals(RowChecker.ReturnCode.INCLUDE, loc3Code);
-
- // release the task slots for loc3.
- taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0));
- taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
-
- ReturnCode loc3Code_2 = checkerHost.canTakeOperation(loc3, 1L);
- assertEquals(RowChecker.ReturnCode.INCLUDE, loc3Code_2);
- }
-
- @Test
- public void testRequestSizeCheckerr() throws IOException {
- final long maxHeapSizePerRequest = 2 * 1024 * 1024;
- final ClusterConnection conn = createHConnection();
- RequestSizeChecker checker = new RequestSizeChecker(maxHeapSizePerRequest);
-
- // inner state is unchanged.
- for (int i = 0; i != 10; ++i) {
- ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code);
- code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code);
- }
-
- // accept the data located on loc1 region.
- ReturnCode acceptCode = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
- assertEquals(RowChecker.ReturnCode.INCLUDE, acceptCode);
- checker.notifyFinal(acceptCode, loc1, maxHeapSizePerRequest);
-
- // the sn server reachs the limit.
- for (int i = 0; i != 10; ++i) {
- ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
- assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
- code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
- assertNotEquals(RowChecker.ReturnCode.INCLUDE, code);
- }
-
- // the request to sn2 server should be accepted.
- for (int i = 0; i != 10; ++i) {
- ReturnCode code = checker.canTakeOperation(loc3, maxHeapSizePerRequest);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code);
- }
-
- checker.reset();
- for (int i = 0; i != 10; ++i) {
- ReturnCode code = checker.canTakeOperation(loc1, maxHeapSizePerRequest);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code);
- code = checker.canTakeOperation(loc2, maxHeapSizePerRequest);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code);
- }
- }
-
- @Test
- public void testSubmittedSizeChecker() {
- final long maxHeapSizeSubmit = 2 * 1024 * 1024;
- SubmittedSizeChecker checker = new SubmittedSizeChecker(maxHeapSizeSubmit);
-
- for (int i = 0; i != 10; ++i) {
- ReturnCode include = checker.canTakeOperation(loc1, 100000);
- assertEquals(ReturnCode.INCLUDE, include);
- }
-
- for (int i = 0; i != 10; ++i) {
- checker.notifyFinal(ReturnCode.INCLUDE, loc1, maxHeapSizeSubmit);
- }
-
- for (int i = 0; i != 10; ++i) {
- ReturnCode include = checker.canTakeOperation(loc1, 100000);
- assertEquals(ReturnCode.END, include);
- }
- for (int i = 0; i != 10; ++i) {
- ReturnCode include = checker.canTakeOperation(loc2, 100000);
- assertEquals(ReturnCode.END, include);
- }
- checker.reset();
- for (int i = 0; i != 10; ++i) {
- ReturnCode include = checker.canTakeOperation(loc1, 100000);
- assertEquals(ReturnCode.INCLUDE, include);
- }
- }
- @Test
- public void testTaskCountChecker() throws InterruptedIOException {
- long rowSize = 12345;
- int maxTotalConcurrentTasks = 100;
- int maxConcurrentTasksPerServer = 2;
- int maxConcurrentTasksPerRegion = 1;
- AtomicLong tasksInProgress = new AtomicLong(0);
- Map<ServerName, AtomicInteger> taskCounterPerServer = new HashMap<>();
- Map<byte[], AtomicInteger> taskCounterPerRegion = new HashMap<>();
- TaskCountChecker checker = new TaskCountChecker(
- maxTotalConcurrentTasks,
- maxConcurrentTasksPerServer,
- maxConcurrentTasksPerRegion,
- tasksInProgress, taskCounterPerServer, taskCounterPerRegion);
-
- // inner state is unchanged.
- for (int i = 0; i != 10; ++i) {
- ReturnCode code = checker.canTakeOperation(loc1, rowSize);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code);
- }
- // add loc1 region.
- ReturnCode code = checker.canTakeOperation(loc1, rowSize);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code);
- checker.notifyFinal(code, loc1, rowSize);
-
- // fill the task slots for loc1.
- taskCounterPerRegion.put(loc1.getRegionInfo().getRegionName(), new AtomicInteger(100));
- taskCounterPerServer.put(loc1.getServerName(), new AtomicInteger(100));
-
- // the region was previously accepted, so it must be accpted now.
- for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
- ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
- assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
- checker.notifyFinal(includeCode, loc1, rowSize);
- }
-
- // fill the task slots for loc3.
- taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(100));
- taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(100));
-
- // no task slots.
- for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
- ReturnCode excludeCode = checker.canTakeOperation(loc3, rowSize);
- assertNotEquals(RowChecker.ReturnCode.INCLUDE, excludeCode);
- checker.notifyFinal(excludeCode, loc3, rowSize);
- }
-
- // release the tasks for loc3.
- taskCounterPerRegion.put(loc3.getRegionInfo().getRegionName(), new AtomicInteger(0));
- taskCounterPerServer.put(loc3.getServerName(), new AtomicInteger(0));
-
- // add loc3 region.
- ReturnCode code3 = checker.canTakeOperation(loc3, rowSize);
- assertEquals(RowChecker.ReturnCode.INCLUDE, code3);
- checker.notifyFinal(code3, loc3, rowSize);
-
- // the region was previously accepted, so it must be accpted now.
- for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
- ReturnCode includeCode = checker.canTakeOperation(loc3, rowSize);
- assertEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
- checker.notifyFinal(includeCode, loc3, rowSize);
- }
-
- checker.reset();
- // the region was previously accepted,
- // but checker have reseted and task slots for loc1 is full.
- // So it must be rejected now.
- for (int i = 0; i != maxConcurrentTasksPerRegion * 5; ++i) {
- ReturnCode includeCode = checker.canTakeOperation(loc1, rowSize);
- assertNotEquals(RowChecker.ReturnCode.INCLUDE, includeCode);
- checker.notifyFinal(includeCode, loc1, rowSize);
- }
+ Assert.assertEquals("the put should not been inserted.", 0, mutator.size());
}
@Test
@@ -1302,9 +1164,12 @@ public class TestAsyncProcess {
@Test
public void testBatch() throws IOException, InterruptedException {
- ClusterConnection conn = new MyConnectionImpl(conf);
- HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
- ht.multiAp = new MyAsyncProcess(conn, conf, false);
+ ClusterConnection conn = new MyConnectionImpl(CONF);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, CONF, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+ HTable ht = new HTable(conn, mutator);
+ ht.multiAp = new MyAsyncProcess(conn, CONF, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
@@ -1332,18 +1197,16 @@ public class TestAsyncProcess {
}
@Test
public void testErrorsServers() throws IOException {
- Configuration configuration = new Configuration(conf);
+ Configuration configuration = new Configuration(CONF);
ClusterConnection conn = new MyConnectionImpl(configuration);
- BufferedMutatorImpl mutator =
- new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
- configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
-
MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
- mutator.ap = ap;
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+ configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
- Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
- mutator.ap.serverTrackerTimeout = 1;
+ Assert.assertNotNull(ap.createServerErrorTracker());
+ Assert.assertTrue(ap.serverTrackerTimeout > 200);
+ ap.serverTrackerTimeout = 1;
Put p = createPut(1, false);
mutator.mutate(p);
@@ -1361,14 +1224,15 @@ public class TestAsyncProcess {
public void testReadAndWriteTimeout() throws IOException {
final long readTimeout = 10 * 1000;
final long writeTimeout = 20 * 1000;
- Configuration copyConf = new Configuration(conf);
+ Configuration copyConf = new Configuration(CONF);
copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
ClusterConnection conn = createHConnection();
Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
- BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
- try (HTable ht = new HTable(conn, bufferParam)) {
- MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+ MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+ try (HTable ht = new HTable(conn, mutator)) {
ht.multiAp = ap;
List<Get> gets = new LinkedList<>();
gets.add(new Get(DUMMY_BYTES_1));
@@ -1399,12 +1263,12 @@ public class TestAsyncProcess {
@Test
public void testGlobalErrors() throws IOException {
- ClusterConnection conn = new MyConnectionImpl(conf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
- AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
- mutator.ap = ap;
+ ClusterConnection conn = new MyConnectionImpl(CONF);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new IOException("test"));
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+ Assert.assertNotNull(ap.createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
@@ -1421,13 +1285,11 @@ public class TestAsyncProcess {
@Test
public void testCallQueueTooLarge() throws IOException {
- ClusterConnection conn = new MyConnectionImpl(conf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
- AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
- mutator.ap = ap;
-
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
-
+ ClusterConnection conn = new MyConnectionImpl(CONF);
+ AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, CONF, new CallQueueTooBigException());
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+ Assert.assertNotNull(ap.createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
@@ -1459,10 +1321,11 @@ public class TestAsyncProcess {
}
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
- HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
- MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
+ MyAsyncProcess ap = new MyAsyncProcess(con, CONF, con.nbThreads);
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(con , bufferParam, ap);
+ HTable ht = new HTable(con, mutator);
ht.multiAp = ap;
-
ht.batch(gets, null);
Assert.assertEquals(ap.nbActions.get(), NB_REGS);
@@ -1482,7 +1345,16 @@ public class TestAsyncProcess {
// One region has no replica, so the main call succeeds for it.
MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
- AsyncRequestFuture ars = ap.submitAll(null,DUMMY_TABLE, rows, null, new Object[3]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[3])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
@@ -1492,7 +1364,16 @@ public class TestAsyncProcess {
// Main call succeeds before replica calls are kicked off.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[3]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[3])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
@@ -1502,7 +1383,16 @@ public class TestAsyncProcess {
// Either main or replica can succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
long replicaCalls = ap.getReplicaCallCount();
Assert.assertTrue(replicaCalls >= 0);
@@ -1517,7 +1407,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
ap.setPrimaryCallDelay(sn2, 2000);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null ,DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
Assert.assertEquals(1, ap.getReplicaCallCount());
}
@@ -1530,7 +1429,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 0);
ap.addFailures(hri1, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
@@ -1542,7 +1450,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 0);
ap.addFailures(hri1, hri1r2, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
@@ -1554,7 +1471,16 @@ public class TestAsyncProcess {
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 0);
ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
- AsyncRequestFuture ars = ap.submitAll(null, DUMMY_TABLE, rows, null, new Object[2]);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(ap.service)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(rows)
+ .setResults(new Object[2])
+ .setSubmittedRows(SubmittedRows.ALL)
+ .build();
+ AsyncRequestFuture ars = ap.submit(task);
verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
// We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
Assert.assertEquals(3, ars.getErrors().getNumExceptions());
@@ -1583,6 +1509,13 @@ public class TestAsyncProcess {
return ap;
}
+ private static BufferedMutatorParams createBufferedMutatorParams(MyAsyncProcess ap, TableName name) {
+ return new BufferedMutatorParams(name)
+ .pool(ap.service)
+ .rpcTimeout(RPC_TIMEOUT)
+ .opertationTimeout(OPERATION_TIMEOUT);
+ }
+
private static List<Get> makeTimelineGets(byte[]... rows) {
List<Get> result = new ArrayList<Get>();
for (byte[] row : rows) {
@@ -1663,14 +1596,9 @@ public class TestAsyncProcess {
}
static class AsyncProcessForThrowableCheck extends AsyncProcess {
- private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
- ExecutorService pool) {
- super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
- conf), rpcTimeout, operationTimeout);
+ public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf) {
+ super(hc, conf, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
+ conf));
}
}
@@ -1681,56 +1609,22 @@ public class TestAsyncProcess {
MyThreadPoolExecutor myPool =
new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(200));
- AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool);
+ AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, CONF);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
-
- ap.submit(null, DUMMY_TABLE, puts, false, null, false);
+ AsyncProcessTask task = AsyncProcessTask.newBuilder()
+ .setPool(myPool)
+ .setRpcTimeout(RPC_TIMEOUT)
+ .setOperationTimeout(OPERATION_TIMEOUT)
+ .setTableName(DUMMY_TABLE)
+ .setRowAccess(puts)
+ .setSubmittedRows(SubmittedRows.NORMAL)
+ .build();
+ ap.submit(task);
Assert.assertTrue(puts.isEmpty());
}
- @Test
- public void testWaitForMaximumCurrentTasks() throws Exception {
- final AtomicLong tasks = new AtomicLong(0);
- final AtomicInteger max = new AtomicInteger(0);
- final CyclicBarrier barrier = new CyclicBarrier(2);
- final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- barrier.await();
- ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
- } catch (InterruptedIOException e) {
- Assert.fail(e.getMessage());
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- };
- // First test that our runnable thread only exits when tasks is zero.
- Thread t = new Thread(runnable);
- t.start();
- barrier.await();
- t.join();
- // Now assert we stay running if max == zero and tasks is > 0.
- barrier.reset();
- tasks.set(1000000);
- t = new Thread(runnable);
- t.start();
- barrier.await();
- while (tasks.get() > 0) {
- assertTrue(t.isAlive());
- tasks.set(tasks.get() - 1);
- }
- t.join();
- }
-
/**
* Test and make sure we could use a special pause setting when retry with
* CallQueueTooBigException, see HBASE-17114
@@ -1738,18 +1632,18 @@ public class TestAsyncProcess {
*/
@Test
public void testRetryPauseWithCallQueueTooBigException() throws Exception {
- Configuration myConf = new Configuration(conf);
+ Configuration myConf = new Configuration(CONF);
final long specialPause = 500L;
final int retries = 1;
myConf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, specialPause);
myConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
ClusterConnection conn = new MyConnectionImpl(myConf);
- BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
AsyncProcessWithFailure ap =
new AsyncProcessWithFailure(conn, myConf, new CallQueueTooBigException());
- mutator.ap = ap;
+ BufferedMutatorParams bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+ Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
Put p = createPut(1, true);
mutator.mutate(p);
@@ -1775,8 +1669,9 @@ public class TestAsyncProcess {
final long normalPause =
myConf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
ap = new AsyncProcessWithFailure(conn, myConf, new IOException());
- mutator.ap = ap;
- Assert.assertNotNull(mutator.ap.createServerErrorTracker());
+ bufferParam = createBufferedMutatorParams(ap, DUMMY_TABLE);
+ mutator = new BufferedMutatorImpl(conn, bufferParam, ap);
+ Assert.assertNotNull(mutator.getAsyncProcess().createServerErrorTracker());
mutator.mutate(p);
startTime = System.currentTimeMillis();
try {