You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/04/05 02:40:21 UTC
svn commit: r1464798 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-it/src/test/java/org/apache/hadoop/hbase/
hbase-server/src/test/java/org/apache/hadoop/hbase/client/
Author: sershe
Date: Fri Apr 5 00:40:21 2013
New Revision: 1464798
URL: http://svn.apache.org/r1464798
Log:
HBASE-7649 client retry timeout doesn't need to do x2 fallback when going to different server
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Apr 5 00:40:21 2013
@@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
/**
@@ -167,6 +168,8 @@ public class HConnectionManager {
/** Default admin protocol class name. */
public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
+ public static final String RETRIES_BY_SERVER = "hbase.client.retries.by.server";
+
private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
static {
@@ -513,10 +516,12 @@ public class HConnectionManager {
private final Class<? extends AdminProtocol> adminClass;
private final Class<? extends ClientProtocol> clientClass;
private final long pause;
- private final int numRetries;
+ private final int numTries;
private final int maxRPCAttempts;
private final int rpcTimeout;
private final int prefetchRegionLimit;
+ private final boolean useServerTrackerForRetries;
+ private final long serverTrackerTimeout;
private volatile boolean closed;
private volatile boolean aborted;
@@ -602,7 +607,7 @@ public class HConnectionManager {
}
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.maxRPCAttempts = conf.getInt(
HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS,
@@ -613,7 +618,21 @@ public class HConnectionManager {
this.prefetchRegionLimit = conf.getInt(
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
-
+ this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER, true);
+ long serverTrackerTimeout = 0;
+ if (this.useServerTrackerForRetries) {
+ // Server tracker allows us to do faster, and yet useful (hopefully), retries.
+ // However, if we are too useful, we might fail very quickly due to retry count limit.
+ // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
+ // retry time if normal retries were used. Then we will retry until this time runs out.
+ // If we keep hitting one server, the net effect will be the incremental backoff, and
+ // essentially the same number of retries as planned. If we have to do faster retries,
+ // we will do more retries in aggregate, but the user will be none the wiser.
+ for (int i = 0; i < this.numTries; ++i) {
+ serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
+ }
+ }
+ this.serverTrackerTimeout = serverTrackerTimeout;
retrieveClusterId();
// ProtobufRpcClientEngine is the main RpcClientEngine implementation,
@@ -772,10 +791,10 @@ public class HConnectionManager {
if (exceptionCaught != null)
// It failed. If it's not the last try, we're going to wait a little
- if (tries < numRetries) {
+ if (tries < numTries) {
// tries at this point is 1 or more; decrement to start from 0.
long pauseTime = ConnectionUtils.getPauseTime(this.pause, tries - 1);
- LOG.info("getMaster attempt " + tries + " of " + numRetries +
+ LOG.info("getMaster attempt " + tries + " of " + numTries +
" failed; retrying after sleep of " +pauseTime + ", exception=" + exceptionCaught);
try {
@@ -788,7 +807,7 @@ public class HConnectionManager {
} else {
// Enough tries, we stop now
- LOG.info("getMaster attempt " + tries + " of " + numRetries +
+ LOG.info("getMaster attempt " + tries + " of " + numTries +
" failed; no more retrying.", exceptionCaught);
throw new MasterNotRunningException(exceptionCaught);
}
@@ -1103,7 +1122,7 @@ public class HConnectionManager {
return location;
}
}
- int localNumRetries = retry ? numRetries : 1;
+ int localNumRetries = retry ? numTries : 1;
// build the key of the meta region we should be looking for.
// the extra 9's on the end are necessary to allow "exact" matches
// without knowing the precise region names.
@@ -1112,7 +1131,7 @@ public class HConnectionManager {
for (int tries = 0; true; tries++) {
if (tries >= localNumRetries) {
throw new NoServerForRegionException("Unable to find region for "
- + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
+ + Bytes.toStringBinary(row) + " after " + numTries + " tries.");
}
HRegionLocation metaLocation = null;
@@ -1210,13 +1229,13 @@ public class HConnectionManager {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
- if (tries < numRetries - 1) {
+ if (tries < numTries - 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("locateRegionInMeta parentTable=" +
Bytes.toString(parentTable) + ", metaLocation=" +
((metaLocation == null)? "null": "{" + metaLocation + "}") +
", attempt=" + tries + " of " +
- this.numRetries + " failed; retrying after sleep of " +
+ this.numTries + " failed; retrying after sleep of " +
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
}
} else {
@@ -1969,6 +1988,8 @@ public class HConnectionManager {
private final List<Action<R>> toReplay;
private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
inProgress;
+
+ private ServerErrorTracker errorsByServer = null;
private int curNumRetries;
// Notified when a tasks is done
@@ -1994,10 +2015,11 @@ public class HConnectionManager {
* Group a list of actions per region servers, and send them. The created MultiActions are
* added to the inProgress list.
* @param actionsList
- * @param sleepTime - sleep time before actually executing the actions. Can be zero.
+ * @param isRetry Whether we are retrying these actions. If yes, backoff
+ * time may be applied before new requests.
* @throws IOException - if we can't locate a region after multiple retries.
*/
- private void submit(List<Action<R>> actionsList, final long sleepTime) throws IOException {
+ private void submit(List<Action<R>> actionsList, final boolean isRetry) throws IOException {
// group per location => regions server
final Map<HRegionLocation, MultiAction<R>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<R>>();
@@ -2022,15 +2044,25 @@ public class HConnectionManager {
// Send the queries and add them to the inProgress list
for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
+ long backoffTime = 0;
+ if (isRetry) {
+ if (hci.isUsingServerTrackerForRetries()) {
+ assert this.errorsByServer != null;
+ backoffTime = this.errorsByServer.calculateBackoffTime(e.getKey(), hci.pause);
+ } else {
+ // curNumRetries starts with one, subtract to start from 0.
+ backoffTime = ConnectionUtils.getPauseTime(hci.pause, curNumRetries - 1);
+ }
+ }
Callable<MultiResponse> callable =
- createDelayedCallable(sleepTime, e.getKey(), e.getValue());
- if (LOG.isTraceEnabled() && (sleepTime > 0)) {
+ createDelayedCallable(backoffTime, e.getKey(), e.getValue());
+ if (LOG.isTraceEnabled() && isRetry) {
StringBuilder sb = new StringBuilder();
for (Action<R> action : e.getValue().allActions()) {
sb.append(Bytes.toStringBinary(action.getAction().getRow())).append(';');
}
- LOG.trace("Sending requests to [" + e.getKey().getHostnamePort()
- + "] with delay of [" + sleepTime + "] for rows [" + sb.toString() + "]");
+ LOG.trace("Will retry requests to [" + e.getKey().getHostnamePort()
+ + "] after delay of [" + backoffTime + "] for rows [" + sb.toString() + "]");
}
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
@@ -2044,9 +2076,7 @@ public class HConnectionManager {
* @throws IOException
*/
private void doRetry() throws IOException{
- // curNumRetries at this point is 1 or more; decrement to start from 0.
- final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries - 1);
- submit(this.toReplay, sleepTime);
+ submit(this.toReplay, true);
this.toReplay.clear();
}
@@ -2085,7 +2115,7 @@ public class HConnectionManager {
}
// execute the actions. We will analyze and resubmit the actions in a 'while' loop.
- submit(listActions, 0);
+ submit(listActions, false);
// LastRetry is true if, either:
// we had an exception 'DoNotRetry'
@@ -2094,7 +2124,7 @@ public class HConnectionManager {
boolean lastRetry = false;
// despite its name numRetries means number of tries. So if numRetries == 1 it means we
// won't retry. And we compare vs. 2 in case someone set it to zero.
- boolean noRetry = (hci.numRetries < 2);
+ boolean noRetry = (hci.numTries < 2);
// Analyze and resubmit until all actions are done successfully or failed after numRetries
while (!this.inProgress.isEmpty()) {
@@ -2112,7 +2142,7 @@ public class HConnectionManager {
} catch (ExecutionException e) {
exception = e;
}
-
+ HRegionLocation location = currentTask.getSecond();
// Error case: no result at all for this multi action. We need to redo all actions
if (responses == null) {
for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
@@ -2120,14 +2150,14 @@ public class HConnectionManager {
Row row = action.getAction();
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
- hci.updateCachedLocations(tableName, row, null, currentTask.getSecond());
+ hci.updateCachedLocations(tableName, row, null, location);
if (noRetry) {
- errors.add(exception, row, currentTask);
+ errors.add(exception, row, location);
} else {
if (isTraceEnabled) {
- retriedErrors.add(exception, row, currentTask);
+ retriedErrors.add(exception, row, location);
}
- lastRetry = addToReplay(nbRetries, action);
+ lastRetry = addToReplay(nbRetries, action, location);
}
}
}
@@ -2146,14 +2176,14 @@ public class HConnectionManager {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = correspondingAction.getAction();
- hci.updateCachedLocations(this.tableName, row, result, currentTask.getSecond());
+ hci.updateCachedLocations(this.tableName, row, result, location);
if (result instanceof DoNotRetryIOException || noRetry) {
- errors.add((Exception)result, row, currentTask);
+ errors.add((Exception)result, row, location);
} else {
if (isTraceEnabled) {
- retriedErrors.add((Exception)result, row, currentTask);
+ retriedErrors.add((Exception)result, row, location);
}
- lastRetry = addToReplay(nbRetries, correspondingAction);
+ lastRetry = addToReplay(nbRetries, correspondingAction, location);
}
} else // success
if (callback != null) {
@@ -2186,11 +2216,10 @@ public class HConnectionManager {
private List<Row> actions = new ArrayList<Row>();
private List<String> addresses = new ArrayList<String>();
- public void add(Exception ex, Row row,
- Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> obj) {
+ public void add(Exception ex, Row row, HRegionLocation location) {
exceptions.add(ex);
actions.add(row);
- addresses.add(obj.getSecond().getHostnamePort());
+ addresses.add(location.getHostnamePort());
}
public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
@@ -2219,17 +2248,24 @@ public class HConnectionManager {
* Put the action that has to be retried in the Replay list.
* @return true if we're out of numRetries and it's the last retry.
*/
- private boolean addToReplay(int[] nbRetries, Action<R> action) {
+ private boolean addToReplay(int[] nbRetries, Action<R> action, HRegionLocation source) {
this.toReplay.add(action);
nbRetries[action.getOriginalIndex()]++;
if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
this.curNumRetries = nbRetries[action.getOriginalIndex()];
}
- // numRetries means number of tries, while curNumRetries means current number of retries. So
- // we need to add 1 to make them comparable. And as we look for the last try we compare
- // with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want
- // to initialize it to 1.
- return ( (this.curNumRetries +1) >= hci.numRetries);
+ if (hci.isUsingServerTrackerForRetries()) {
+ if (this.errorsByServer == null) {
+ this.errorsByServer = hci.createServerErrorTracker();
+ }
+ this.errorsByServer.reportServerError(source);
+ return !this.errorsByServer.canRetryMore();
+ } else {
+ // We need to add 1 to make tries and retries comparable. And as we look for
+ // the last try we compare with '>=' and not '>'. And we need curNumRetries
+ // to means what it says as we don't want to initialize it to 1.
+ return ((this.curNumRetries + 1) >= hci.numTries);
+ }
}
/**
@@ -2521,8 +2557,102 @@ public class HConnectionManager {
void setRpcEngine(RpcClientEngine engine) {
this.rpcEngine = engine;
}
- }
+ /**
+ * The record of errors for servers. Visible for testing.
+ */
+ @VisibleForTesting
+ static class ServerErrorTracker {
+ private final Map<HRegionLocation, ServerErrors> errorsByServer =
+ new HashMap<HRegionLocation, ServerErrors>();
+ private long canRetryUntil = 0;
+
+ public ServerErrorTracker(long timeout) {
+ LOG.info("Server tracker timeout is " + timeout + "ms");
+ this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+ }
+
+ boolean canRetryMore() {
+ return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
+ }
+
+ /**
+ * Calculates the back-off time for a retrying request to a particular server.
+ * This is here, and package private, for testing (no good way to get at it).
+ * @param server The server in question.
+ * @param basePause The default hci pause.
+ * @return The time to wait before sending next request.
+ */
+ long calculateBackoffTime(HRegionLocation server, long basePause) {
+ long result = 0;
+ ServerErrors errorStats = errorsByServer.get(server);
+ if (errorStats != null) {
+ result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
+ // Adjust by the time we already waited since last talking to this server.
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long timeSinceLastError = now - errorStats.getLastErrorTime();
+ if (timeSinceLastError > 0) {
+ result = Math.max(0, result - timeSinceLastError);
+ }
+ // Finally, see if the backoff time overshoots the timeout.
+ if (result > 0 && (now + result > this.canRetryUntil)) {
+ result = Math.max(0, this.canRetryUntil - now);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Reports that there was an error on the server to do whatever bean-counting necessary.
+ * This is here, and package private, for testing (no good way to get at it).
+ * @param server The server in question.
+ */
+ void reportServerError(HRegionLocation server) {
+ ServerErrors errors = errorsByServer.get(server);
+ if (errors != null) {
+ errors.addError();
+ } else {
+ errorsByServer.put(server, new ServerErrors());
+ }
+ }
+
+ /**
+ * The record of errors for a server.
+ */
+ private static class ServerErrors {
+ public long lastErrorTime;
+ public int retries;
+
+ public ServerErrors() {
+ this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+ this.retries = 0;
+ }
+
+ public void addError() {
+ this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
+ ++this.retries;
+ }
+
+ public long getLastErrorTime() {
+ return this.lastErrorTime;
+ }
+ }
+ }
+
+ public boolean isUsingServerTrackerForRetries() {
+ return this.useServerTrackerForRetries;
+ }
+ /**
+ * Creates the server error tracker to use inside process.
+ * Currently, to preserve the main assumption about current retries, and to work well with
+ * the retry-limit-based calculation, the calculation is local per Process object.
+ * We may benefit from connection-wide tracking of server errors.
+ * @return ServerErrorTracker to use.
+ */
+ ServerErrorTracker createServerErrorTracker() {
+ return new ServerErrorTracker(this.serverTrackerTimeout);
+ }
+ }
/**
* Set the number of retries to use serverside when trying to communicate
* with another server over {@link HConnection}. Used updating catalog
Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java Fri Apr 5 00:40:21 2013
@@ -24,6 +24,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
@@ -42,9 +43,9 @@ public abstract class IngestIntegrationT
protected HBaseCluster cluster;
private LoadTestTool loadTool;
- protected void setUp(int numSlavesBase) throws Exception {
+ protected void setUp(int numSlavesBase, Configuration conf) throws Exception {
tableName = this.getClass().getSimpleName();
- util = new IntegrationTestingUtility();
+ util = (conf == null) ? new IntegrationTestingUtility() : new IntegrationTestingUtility(conf);
LOG.info("Initializing cluster with " + numSlavesBase + " servers");
util.initializeCluster(numSlavesBase);
LOG.info("Done initializing cluster");
@@ -58,6 +59,10 @@ public abstract class IngestIntegrationT
Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
}
+ protected void setUp(int numSlavesBase) throws Exception {
+ setUp(numSlavesBase, null);
+ }
+
protected void tearDown() throws Exception {
LOG.info("Restoring the cluster");
util.restoreCluster();
Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServersTargeted.java Fri Apr 5 00:40:21 2013
@@ -27,6 +27,8 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChaosMonkey;
import org.apache.hadoop.hbase.util.Pair;
@@ -100,7 +102,9 @@ public class IntegrationTestRebalanceAnd
@Before
@SuppressWarnings("unchecked")
public void setUp() throws Exception {
- super.setUp(NUM_SLAVES_BASE);
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(HConnectionManager.RETRIES_BY_SERVER, "true");
+ super.setUp(NUM_SLAVES_BASE, conf);
ChaosMonkey.Policy chaosPolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
CHAOS_EVERY_MS, new UnbalanceKillAndRebalanceAction());
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1464798&r1=1464797&r2=1464798&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Fri Apr 5 00:40:21 2013
@@ -18,10 +18,7 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -42,6 +39,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
@@ -55,6 +53,8 @@ import org.apache.hadoop.hbase.master.HM
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
@@ -304,13 +304,13 @@ public class TestHCM {
// Hijack the number of retry to fail immediately instead of retrying: there will be no new
// connection to the master
- Field numRetries = conn.getClass().getDeclaredField("numRetries");
- numRetries.setAccessible(true);
+ Field numTries = conn.getClass().getDeclaredField("numTries");
+ numTries.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
- modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
- final int prevNumRetriesVal = (Integer)numRetries.get(conn);
- numRetries.set(conn, 1);
+ modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
+ final int prevNumRetriesVal = (Integer)numTries.get(conn);
+ numTries.set(conn, 1);
// We do a put and expect the cache to be updated, even if we don't retry
LOG.info("Put starting");
@@ -379,7 +379,7 @@ public class TestHCM {
"Previous server was "+destServer.getServerName().getHostAndPort(),
curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
- numRetries.set(conn, prevNumRetriesVal);
+ numTries.set(conn, prevNumRetriesVal);
table.close();
}
@@ -705,13 +705,13 @@ public class TestHCM {
conn.getCachedLocation(TABLE_NAME3, ROW_X).getPort() == destServerName.getPort());
// Hijack the number of retry to fail after 2 tries
- Field numRetries = conn.getClass().getDeclaredField("numRetries");
- numRetries.setAccessible(true);
+ Field numTries = conn.getClass().getDeclaredField("numTries");
+ numTries.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
- modifiersField.setInt(numRetries, numRetries.getModifiers() & ~Modifier.FINAL);
- final int prevNumRetriesVal = (Integer)numRetries.get(conn);
- numRetries.set(conn, 2);
+ modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
+ final int prevNumRetriesVal = (Integer)numTries.get(conn);
+ numTries.set(conn, 2);
Put put3 = new Put(ROW_X);
put3.add(FAM_NAM, ROW_X, ROW_X);
@@ -722,10 +722,83 @@ public class TestHCM {
table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
// second we get RegionMovedException.
- numRetries.set(conn, prevNumRetriesVal);
+ numTries.set(conn, prevNumRetriesVal);
table.close();
conn.close();
}
+ @Test
+ public void testErrorBackoffTimeCalculation() throws Exception {
+ final long ANY_PAUSE = 1000;
+ HRegionInfo ri = new HRegionInfo(TABLE_NAME);
+ HRegionLocation location = new HRegionLocation(ri, new ServerName("127.0.0.1", 1, 0));
+ HRegionLocation diffLocation = new HRegionLocation(ri, new ServerName("127.0.0.1", 2, 0));
+
+ ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(timeMachine);
+ try {
+ long timeBase = timeMachine.currentTimeMillis();
+ long largeAmountOfTime = ANY_PAUSE * 1000;
+ HConnectionImplementation.ServerErrorTracker tracker =
+ new HConnectionImplementation.ServerErrorTracker(largeAmountOfTime);
+
+ // The default backoff is 0.
+ assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
+
+ // Check some backoff values from HConstants sequence.
+ tracker.reportServerError(location);
+ assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
+ tracker.reportServerError(location);
+ tracker.reportServerError(location);
+ tracker.reportServerError(location);
+ assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(location, ANY_PAUSE));
+
+ // All of this shouldn't affect backoff for different location.
+
+ assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
+ tracker.reportServerError(diffLocation);
+ assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
+
+ // But should still work for a different region in the same location.
+ HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2);
+ HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName());
+ assertEqualsWithJitter(ANY_PAUSE * 2, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE));
+
+ // Check with different base.
+ assertEqualsWithJitter(ANY_PAUSE * 4,
+ tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
+
+ // See that time from last error is taken into account. Time shift is applied after jitter,
+ // so pass the original expected backoff as the base for jitter.
+ long timeShift = (long)(ANY_PAUSE * 0.5);
+ timeMachine.setValue(timeBase + timeShift);
+ assertEqualsWithJitter(ANY_PAUSE * 2 - timeShift,
+ tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
+
+ // However we should not go into negative.
+ timeMachine.setValue(timeBase + ANY_PAUSE * 100);
+ assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
+
+ // We also should not go over the boundary; last retry would be on it.
+ long timeLeft = (long)(ANY_PAUSE * 0.5);
+ timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
+ assertTrue(tracker.canRetryMore());
+ tracker.reportServerError(location);
+ assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
+ timeMachine.setValue(timeBase + largeAmountOfTime);
+ assertFalse(tracker.canRetryMore());
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ private static void assertEqualsWithJitter(long expected, long actual) {
+ assertEqualsWithJitter(expected, actual, expected);
+ }
+
+ private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
+ assertTrue("Value not within jitter: " + expected + " vs " + actual,
+ Math.abs(actual - expected) <= (0.01f * jitterBase));
+ }
}