You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/10/29 20:50:08 UTC
svn commit: r1536871 - in /hbase/branches/0.96:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/test/java/org/apache/hadoop/hbase/client/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-server/src/main/java/org/apa...
Author: nkeywal
Date: Tue Oct 29 19:50:08 2013
New Revision: 1536871
URL: http://svn.apache.org/r1536871
Log:
HBASE-9843 Various fixes in client code
Modified:
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Tue Oct 29 19:50:08 2013
@@ -87,7 +87,9 @@ import org.cloudera.htrace.Trace;
*/
class AsyncProcess<CResult> {
private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
- private final static int START_LOG_ERRORS_CNT = 4;
+ protected static final AtomicLong COUNTER = new AtomicLong();
+ protected final long id;
+ private final int startLogErrorsCnt;
protected final HConnection hConnection;
protected final TableName tableName;
protected final ExecutorService pool;
@@ -97,6 +99,7 @@ class AsyncProcess<CResult> {
protected final AtomicBoolean hasError = new AtomicBoolean(false);
protected final AtomicLong tasksSent = new AtomicLong(0);
protected final AtomicLong tasksDone = new AtomicLong(0);
+ protected final AtomicLong retriesCnt = new AtomicLong(0);
protected final ConcurrentMap<String, AtomicInteger> taskCounterPerRegion =
new ConcurrentHashMap<String, AtomicInteger>();
protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
@@ -121,7 +124,6 @@ class AsyncProcess<CResult> {
protected final int maxConcurrentTasksPerServer;
protected final long pause;
protected int numTries;
- protected final boolean useServerTrackerForRetries;
protected int serverTrackerTimeout;
protected RpcRetryingCallerFactory rpcCallerFactory;
@@ -205,6 +207,8 @@ class AsyncProcess<CResult> {
this.pool = pool;
this.callback = callback;
+ this.id = COUNTER.incrementAndGet();
+
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
@@ -217,6 +221,11 @@ class AsyncProcess<CResult> {
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
+ // A few failure is fine: region moved, then is not opened, then is overloaded. We try
+ // to have an acceptable heuristic for the number of errors we don't log.
+ // 9 was chosen because we wait for 1s at this stage.
+ this.startLogErrorsCnt = conf.getInt("hbase.client.start.log.errors.counter", 9);
+
if (this.maxTotalConcurrentTasks <= 0) {
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
}
@@ -229,23 +238,19 @@ class AsyncProcess<CResult> {
maxConcurrentTasksPerRegion);
}
- this.useServerTrackerForRetries =
- conf.getBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
-
- if (this.useServerTrackerForRetries) {
- // Server tracker allows us to do faster, and yet useful (hopefully), retries.
- // However, if we are too useful, we might fail very quickly due to retry count limit.
- // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
- // retry time if normal retries were used. Then we will retry until this time runs out.
- // If we keep hitting one server, the net effect will be the incremental backoff, and
- // essentially the same number of retries as planned. If we have to do faster retries,
- // we will do more retries in aggregate, but the user will be none the wiser.
- this.serverTrackerTimeout = 0;
- for (int i = 0; i < this.numTries; ++i) {
- serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
- }
+ // Server tracker allows us to do faster, and yet useful (hopefully), retries.
+ // However, if we are too useful, we might fail very quickly due to retry count limit.
+ // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
+ // retry time if normal retries were used. Then we will retry until this time runs out.
+ // If we keep hitting one server, the net effect will be the incremental backoff, and
+ // essentially the same number of retries as planned. If we have to do faster retries,
+ // we will do more retries in aggregate, but the user will be none the wiser.
+ this.serverTrackerTimeout = 0;
+ for (int i = 0; i < this.numTries; ++i) {
+ serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
}
+
this.rpcCallerFactory = rpcCaller;
}
@@ -291,7 +296,7 @@ class AsyncProcess<CResult> {
Iterator<? extends Row> it = rows.iterator();
while (it.hasNext()) {
Row r = it.next();
- HRegionLocation loc = findDestLocation(r, 1, posInList);
+ HRegionLocation loc = findDestLocation(r, posInList);
if (loc == null) { // loc is null if there is an error such as meta not available.
it.remove();
@@ -332,18 +337,17 @@ class AsyncProcess<CResult> {
* Find the destination.
*
* @param row the row
- * @param numAttempt the num attempt
* @param posInList the position in the list
* @return the destination. Null if we couldn't find it.
*/
- private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
- if (row == null) throw new IllegalArgumentException("row cannot be null");
+ private HRegionLocation findDestLocation(Row row, int posInList) {
+ if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
HRegionLocation loc = null;
IOException locationException = null;
try {
loc = hConnection.locateRegion(this.tableName, row.getRow());
if (loc == null) {
- locationException = new IOException("No location found, aborting submit for" +
+ locationException = new IOException("#" + id + ", no location found, aborting submit for" +
" tableName=" + tableName +
" rowkey=" + Arrays.toString(row.getRow()));
}
@@ -353,7 +357,7 @@ class AsyncProcess<CResult> {
if (locationException != null) {
// There are multiple retries in locateRegion already. No need to add new.
// We can't continue with this row, hence it's the last retry.
- manageError(numAttempt, posInList, row, false, locationException, null);
+ manageError(posInList, row, false, locationException, null);
return null;
}
@@ -460,12 +464,17 @@ class AsyncProcess<CResult> {
private void submit(List<Action<Row>> initialActions,
List<Action<Row>> currentActions, int numAttempt,
final HConnectionManager.ServerErrorTracker errorsByServer) {
+
+ if (numAttempt > 1){
+ retriesCnt.incrementAndGet();
+ }
+
// group per location => regions server
final Map<HRegionLocation, MultiAction<Row>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<Row>>();
for (Action<Row> action : currentActions) {
- HRegionLocation loc = findDestLocation(action.getAction(), 1, action.getOriginalIndex());
+ HRegionLocation loc = findDestLocation(action.getAction(), action.getOriginalIndex());
if (loc != null) {
addAction(loc, action, actionsByServer);
}
@@ -503,7 +512,8 @@ class AsyncProcess<CResult> {
try {
res = createCaller(callable).callWithoutRetries(callable);
} catch (IOException e) {
- LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +
+ LOG.warn("#" + id + ", call to " + loc.getServerName() +
+ " failed numAttempt=" + numAttempt +
", resubmitting all since not sure where we are at", e);
resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
return;
@@ -522,7 +532,7 @@ class AsyncProcess<CResult> {
// This should never happen. But as the pool is provided by the end user, let's secure
// this a little.
decTaskCounters(multiAction.getRegions(), loc.getServerName());
- LOG.warn("The task was rejected by the pool. This is unexpected." +
+ LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
" Server is " + loc.getServerName(), ree);
// We're likely to fail again, but this will increment the attempt counter, so it will
// finish.
@@ -551,7 +561,6 @@ class AsyncProcess<CResult> {
/**
* Check that we can retry acts accordingly: logs, set the error status, call the callbacks.
*
- * @param numAttempt the number of this attempt
* @param originalIndex the position in the list sent
* @param row the row
* @param canRetry if false, we won't retry whatever the settings.
@@ -559,13 +568,10 @@ class AsyncProcess<CResult> {
* @param location the location, if any (can be null)
* @return true if the action can be retried, false otherwise.
*/
- private boolean manageError(int numAttempt, int originalIndex, Row row, boolean canRetry,
+ private boolean manageError(int originalIndex, Row row, boolean canRetry,
Throwable throwable, HRegionLocation location) {
- if (canRetry) {
- if (numAttempt >= numTries ||
- (throwable != null && throwable instanceof DoNotRetryIOException)) {
- canRetry = false;
- }
+ if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) {
+ canRetry = false;
}
byte[] region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes();
@@ -608,15 +614,14 @@ class AsyncProcess<CResult> {
List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
for (Action<Row> action : e.getValue()) {
- if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
- true, t, location)) {
+ if (manageError(action.getOriginalIndex(), action.getAction(), true, t, location)) {
toReplay.add(action);
}
}
}
if (toReplay.isEmpty()) {
- LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
+ LOG.warn("#" + id + ", attempt #" + numAttempt + "/" + numTries + " failed for all " +
initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
} else {
submit(initialActions, toReplay, numAttempt, errorsByServer);
@@ -628,7 +633,7 @@ class AsyncProcess<CResult> {
*
* @param initialActions - the whole action list
* @param rsActions - the actions for this location
- * @param location - the location
+ * @param location - the location. It's used as a server name.
* @param responses - the response, if any
* @param numAttempt - the attempt
*/
@@ -638,8 +643,8 @@ class AsyncProcess<CResult> {
HConnectionManager.ServerErrorTracker errorsByServer) {
if (responses == null) {
- LOG.info("Attempt #" + numAttempt + "/" + numTries + " failed all ops, trying resubmit," +
- location);
+ LOG.info("#" + id + ", attempt #" + numAttempt + "/" + numTries +
+ " failed all ops, trying resubmit," + location);
resubmitAll(initialActions, rsActions, location, numAttempt + 1, null, errorsByServer);
return;
}
@@ -670,14 +675,15 @@ class AsyncProcess<CResult> {
failureCount++;
if (!regionFailureRegistered) { // We're doing this once per location.
regionFailureRegistered= true;
+ // The location here is used as a server name.
hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location);
- if (errorsByServer != null) {
+ if (failureCount == 1) {
errorsByServer.reportServerError(location);
- canRetry = errorsByServer.canRetryMore();
+ canRetry = errorsByServer.canRetryMore(numAttempt);
}
}
- if (manageError(numAttempt, correspondingAction.getOriginalIndex(), row, canRetry,
+ if (manageError(correspondingAction.getOriginalIndex(), row, canRetry,
throwable, location)) {
toReplay.add(correspondingAction);
}
@@ -694,21 +700,24 @@ class AsyncProcess<CResult> {
}
if (!toReplay.isEmpty()) {
- long backOffTime = (errorsByServer != null ?
- errorsByServer.calculateBackoffTime(location, pause) :
- ConnectionUtils.getPauseTime(pause, numAttempt));
- if (numAttempt > START_LOG_ERRORS_CNT && LOG.isDebugEnabled()) {
+ // We have two contradicting needs here:
+ // 1) We want to get the new location after having slept, as it may change.
+ // 2) We want to take into account the location when calculating the sleep time.
+ // It should be possible to have some heuristics to take the right decision. Short term,
+ // we go for one.
+ long backOffTime = errorsByServer.calculateBackoffTime(location, pause);
+ if (numAttempt > startLogErrorsCnt) {
// We use this value to have some logs when we have multiple failures, but not too many
// logs, as errors are to be expected when a region moves, splits and so on
- LOG.debug("Attempt #" + numAttempt + "/" + numTries + " failed " + failureCount +
- " ops , resubmitting " + toReplay.size() + ", " + location + ", last exception was: " +
- (throwable == null ? "null" : throwable.getMessage()) +
- ", sleeping " + backOffTime + "ms");
+ LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
+ location.getServerName(), throwable, backOffTime, true,
+ errorsByServer.getStartTrackingTime()));
}
+
try {
Thread.sleep(backOffTime);
} catch (InterruptedException e) {
- LOG.warn("Not sent: " + toReplay.size() + " operations, " + location, e);
+ LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + location, e);
Thread.interrupted();
return;
}
@@ -717,16 +726,46 @@ class AsyncProcess<CResult> {
} else {
if (failureCount != 0) {
// We have a failure but nothing to retry. We're done, it's a final failure..
- LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount +
- " ops on " + location.getServerName() + " NOT resubmitting. " + location);
- } else if (numAttempt > START_LOG_ERRORS_CNT + 1 && LOG.isDebugEnabled()) {
+ LOG.warn(createLog(numAttempt, failureCount, toReplay.size(),
+ location.getServerName(), throwable, -1, false,
+ errorsByServer.getStartTrackingTime()));
+ } else if (numAttempt > startLogErrorsCnt + 1) {
// The operation was successful, but needed several attempts. Let's log this.
- LOG.debug("Attempt #" + numAttempt + "/" + numTries + " finally suceeded, size=" +
- toReplay.size());
+ LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
+ location.getServerName(), throwable, -1, false,
+ errorsByServer.getStartTrackingTime()));
}
}
}
+ private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
+ Throwable error, long backOffTime, boolean willRetry, String startTime){
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("#").append(id).append(", table=").append(tableName).
+ append(", Attempt #").append(numAttempt).append("/").append(numTries).append(" ");
+
+ if (failureCount > 0 || error != null){
+ sb.append("failed ").append(failureCount).append(" ops").append(", last exception was: ").
+ append(error == null ? "null" : error.getMessage());
+ }else {
+ sb.append("SUCCEEDED");
+ }
+
+ sb.append(" on server ").append(sn);
+
+ sb.append(", tracking started at ").append(startTime);
+
+ if (willRetry) {
+ sb.append(" - retrying after sleeping for ").append(backOffTime).append(" ms").
+ append(", will replay ").append(replaySize).append(" ops.");
+ } else if (failureCount > 0) {
+ sb.append(" - FAILED, NOT RETRYING ANYMORE");
+ }
+
+ return sb.toString();
+ }
+
/**
* Waits for another task to finish.
* @param currentNumberOfTask - the number of task finished when calling the method.
@@ -738,7 +777,7 @@ class AsyncProcess<CResult> {
this.tasksDone.wait(100);
}
} catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted." +
+ throw new InterruptedIOException("#" + id + ", interrupted." +
" currentNumberOfTask=" + currentNumberOfTask +
", tableName=" + tableName + ", tasksDone=" + tasksDone.get());
}
@@ -756,9 +795,10 @@ class AsyncProcess<CResult> {
long now = EnvironmentEdgeManager.currentTimeMillis();
if (now > lastLog + 10000) {
lastLog = now;
- LOG.info(": Waiting for the global number of running tasks to be equals or less than "
+ LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
+ max + ", tasksSent=" + tasksSent.get() + ", tasksDone=" + tasksDone.get() +
- ", currentTasksDone=" + currentTasksDone + ", tableName=" + tableName);
+ ", currentTasksDone=" + currentTasksDone + ", retries=" + retriesCnt.get() +
+ " hasError=" + hasError() + ", tableName=" + tableName);
}
waitForNextTaskDone(currentTasksDone);
currentTasksDone = this.tasksDone.get();
@@ -848,10 +888,6 @@ class AsyncProcess<CResult> {
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
*/
protected HConnectionManager.ServerErrorTracker createServerErrorTracker() {
- if (useServerTrackerForRetries){
- return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout);
- }else {
- return null;
- }
+ return new HConnectionManager.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
}
}
Modified: hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.96/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Oct 29 19:50:08 2013
@@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.SocketException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -2637,15 +2638,23 @@ public class HConnectionManager {
// We need a concurrent map here, as we could have multiple threads updating it in parallel.
private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
new ConcurrentHashMap<HRegionLocation, ServerErrors>();
- private long canRetryUntil = 0;
+ private final long canRetryUntil;
+ private final int maxRetries;
+ private final String startTrackingTime;
- public ServerErrorTracker(long timeout) {
- LOG.trace("Server tracker timeout is " + timeout + "ms");
+ public ServerErrorTracker(long timeout, int maxRetries) {
+ this.maxRetries = maxRetries;
this.canRetryUntil = EnvironmentEdgeManager.currentTimeMillis() + timeout;
+ this.startTrackingTime = new Date().toString();
}
- boolean canRetryMore() {
- return EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil;
+ /**
+ * We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
+ */
+ boolean canRetryMore(int numRetry) {
+ // If there is a single try we must not take into account the time.
+ return numRetry < maxRetries || (maxRetries > 1 &&
+ EnvironmentEdgeManager.currentTimeMillis() < this.canRetryUntil);
}
/**
@@ -2656,20 +2665,12 @@ public class HConnectionManager {
* @return The time to wait before sending next request.
*/
long calculateBackoffTime(HRegionLocation server, long basePause) {
- long result = 0;
+ long result;
ServerErrors errorStats = errorsByServer.get(server);
if (errorStats != null) {
- result = ConnectionUtils.getPauseTime(basePause, errorStats.retries);
- // Adjust by the time we already waited since last talking to this server.
- long now = EnvironmentEdgeManager.currentTimeMillis();
- long timeSinceLastError = now - errorStats.getLastErrorTime();
- if (timeSinceLastError > 0) {
- result = Math.max(0, result - timeSinceLastError);
- }
- // Finally, see if the backoff time overshoots the timeout.
- if (result > 0 && (now + result > this.canRetryUntil)) {
- result = Math.max(0, this.canRetryUntil - now);
- }
+ result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
+ } else {
+ result = 0; // yes, if the server is not in our list we don't wait before retrying.
}
return result;
}
@@ -2684,29 +2685,25 @@ public class HConnectionManager {
if (errors != null) {
errors.addError();
} else {
- errorsByServer.put(server, new ServerErrors());
+ errors = errorsByServer.putIfAbsent(server, new ServerErrors());
+ if (errors != null){
+ errors.addError();
+ }
}
}
+ String getStartTrackingTime() {
+ return startTrackingTime;
+ }
+
/**
* The record of errors for a server.
*/
private static class ServerErrors {
- public long lastErrorTime;
- public int retries;
-
- public ServerErrors() {
- this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
- this.retries = 0;
- }
+ public final AtomicInteger retries = new AtomicInteger(0);
public void addError() {
- this.lastErrorTime = EnvironmentEdgeManager.currentTimeMillis();
- ++this.retries;
- }
-
- public long getLastErrorTime() {
- return this.lastErrorTime;
+ retries.incrementAndGet();
}
}
}
Modified: hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original)
+++ hbase/branches/0.96/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Tue Oct 29 19:50:08 2013
@@ -679,7 +679,7 @@ public class TestAsyncProcess {
HTable ht = new HTable();
Configuration configuration = new Configuration(conf);
configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
- configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
+ configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
// set default writeBufferSize
ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152));
@@ -688,24 +688,21 @@ public class TestAsyncProcess {
ht.ap = new MyAsyncProcess<Object>(mci, null, configuration);
- Assert.assertTrue(ht.ap.useServerTrackerForRetries);
Assert.assertNotNull(ht.ap.createServerErrorTracker());
- Assert.assertTrue(ht.ap.serverTrackerTimeout > 10000);
+ Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
ht.ap.serverTrackerTimeout = 1;
-
Put p = createPut(1, false);
ht.setAutoFlush(false, false);
ht.put(p);
- long start = System.currentTimeMillis();
try {
ht.flushCommits();
Assert.fail();
} catch (RetriesExhaustedWithDetailsException expected) {
}
- // Checking that the ErrorsServers came into play and made us stop immediately
- Assert.assertTrue((System.currentTimeMillis() - start) < 10000);
+ // Checking that the ErrorsServers came into play and didn't make us stop immediately
+ Assert.assertEquals(ht.ap.tasksSent.get(), 3);
}
/**
@@ -731,8 +728,7 @@ public class TestAsyncProcess {
MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
ht.connection = con;
- ht.batch(gets);
-
+ ht.batch(gets);
Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
Modified: hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.96/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Oct 29 19:50:08 2013
@@ -510,9 +510,10 @@ public final class HConstants {
* run out of array items. Retries beyond this use the last number in the array. So, for
* example, if hbase.client.pause is 1 second, and maximum retries count
* hbase.client.retries.number is 10, we will retry at the following intervals:
- * 1, 2, 3, 10, 100, 100, 100, 100, 100, 100.
+ * 1, 2, 3, 5, 10, 20, 40, 100, 100, 100.
+ * With 100ms, a back-off of 200 means 20s
*/
- public static int RETRY_BACKOFF[] = { 1, 2, 3, 5, 10, 100 };
+ public static int RETRY_BACKOFF[] = { 1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200 };
public static final String REGION_IMPL = "hbase.hregion.impl";
@@ -589,7 +590,7 @@ public final class HConstants {
/**
* Default value of {@link #HBASE_CLIENT_MAX_PERSERVER_TASKS}.
*/
- public static final int DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS = 5;
+ public static final int DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS = 2;
/**
* The maximum number of concurrent connections the client will maintain to a single
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java Tue Oct 29 19:50:08 2013
@@ -93,13 +93,13 @@ public class ClusterStatusPublisher exte
* We want to limit the size of the protobuf message sent, do fit into a single packet.
* a reasonable size for ip / ethernet is less than 1Kb.
*/
- public static int MAX_SERVER_PER_MESSAGE = 10;
+ public final static int MAX_SERVER_PER_MESSAGE = 10;
/**
* If a server dies, we're sending the information multiple times in case a receiver misses the
* message.
*/
- public static int NB_SEND = 5;
+ public final static int NB_SEND = 5;
public ClusterStatusPublisher(HMaster master, Configuration conf,
Class<? extends Publisher> publisherClass)
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 29 19:50:08 2013
@@ -2541,7 +2541,13 @@ public class HRegion implements HeapSize
if (this.memstoreSize.get() > this.blockingMemStoreSize) {
requestFlush();
- throw new RegionTooBusyException("above memstore limit");
+ throw new RegionTooBusyException("Above memstore limit, " +
+ "regionName=" + (this.getRegionInfo() == null ? "unknown" :
+ this.getRegionInfo().getRegionNameAsString()) +
+ ", server=" + (this.getRegionServerServices() == null ? "unknown" :
+ this.getRegionServerServices().getServerName()) +
+ ", memstoreSize=" + memstoreSize.get() +
+ ", blockingMemStoreSize=" + blockingMemStoreSize);
}
}
@@ -5381,10 +5387,14 @@ public class HRegion implements HeapSize
throws RegionTooBusyException, InterruptedIOException {
try {
final long waitTime = Math.min(maxBusyWaitDuration,
- busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
+ busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
throw new RegionTooBusyException(
- "failed to get a lock in " + waitTime + "ms");
+ "failed to get a lock in " + waitTime + " ms. " +
+ "regionName=" + (this.getRegionInfo() == null ? "unknown" :
+ this.getRegionInfo().getRegionNameAsString()) +
+ ", server=" + (this.getRegionServerServices() == null ? "unknown" :
+ this.getRegionServerServices().getServerName()));
}
} catch (InterruptedException ie) {
LOG.info("Interrupted while waiting for a lock");
Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1536871&r1=1536870&r2=1536871&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Tue Oct 29 19:50:08 2013
@@ -870,7 +870,7 @@ public class TestHCM {
long timeBase = timeMachine.currentTimeMillis();
long largeAmountOfTime = ANY_PAUSE * 1000;
HConnectionManager.ServerErrorTracker tracker =
- new HConnectionManager.ServerErrorTracker(largeAmountOfTime);
+ new HConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
// The default backoff is 0.
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
@@ -912,11 +912,11 @@ public class TestHCM {
// We also should not go over the boundary; last retry would be on it.
long timeLeft = (long)(ANY_PAUSE * 0.5);
timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
- assertTrue(tracker.canRetryMore());
+ assertTrue(tracker.canRetryMore(1));
tracker.reportServerError(location);
assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
timeMachine.setValue(timeBase + largeAmountOfTime);
- assertFalse(tracker.canRetryMore());
+ assertFalse(tracker.canRetryMore(1));
} finally {
EnvironmentEdgeManager.reset();
}