You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/04 10:06:14 UTC
svn commit: r1369281 - in /hbase/branches/0.89-fb: conf/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/regionserver/
Author: mbautin
Date: Sat Aug 4 08:06:14 2012
New Revision: 1369281
URL: http://svn.apache.org/viewvc?rev=1369281&view=rev
Log:
[HBASE-6423] [0.89-fb] Enable a configuration setting to throw exceptions instead of blocking RPC threads upon hitting the memstore limit.
Author: aaiyer
Summary:
handle exceptions received at the client end
allow the number of server requested retries to be configurable on the client
set default to 0
Test Plan:
mr test; Deploy to the Unicorn test cluster and test the
performance on intense work load.
Reviewers: kranganathan, liyintang
Reviewed By: kranganathan
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D532256
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java
Modified:
hbase/branches/0.89-fb/conf/hbase-site.xml
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Modified: hbase/branches/0.89-fb/conf/hbase-site.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/conf/hbase-site.xml?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/conf/hbase-site.xml (original)
+++ hbase/branches/0.89-fb/conf/hbase-site.xml Sat Aug 4 08:06:14 2012
@@ -26,4 +26,5 @@
<name>hbase.master.port</name>
<value>60001</value>
</property>
+
</configuration>
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Aug 4 08:06:14 2012
@@ -243,6 +243,11 @@ public final class HConstants {
public static final String HREGION_MEMSTORE_FLUSH_SIZE =
"hbase.hregion.memstore.flush.size";
+ public static final String HREGION_MEMSTORE_BLOCK_MULTIPLIER =
+ "hbase.hregion.memstore.flush.size";
+ public static final String HREGION_MEMSTORE_WAIT_ON_BLOCK =
+ "hbase.hregion.memstore.flush.size";
+
/** Default size of a reservation block */
public static final int DEFAULT_SIZE_RESERVATION_BLOCK = 1024 * 1024 * 5;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Aug 4 08:06:14 2012
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.ipc.HBase
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MetaUtils;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
@@ -386,6 +387,11 @@ public class HConnectionManager {
// tables whose region cache prefetch are disabled.
private final Set<Integer> regionCachePrefetchDisabledTables =
new CopyOnWriteArraySet<Integer>();
+ // The number of times we will retry after receiving a RegionOverloadedException from the
+ // region server. Defaults to 0 (i.e. we will throw the exception and let the client handle retries)
+ // may not always be what you want. But, for the purposes of the HBaseThrift client, that this is
+ // created for, we do not want the thrift layer to hold up IPC threads handling retries.
+ private int maxServerRequestedRetries;
/**
* constructor
@@ -413,6 +419,8 @@ public class HConnectionManager {
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
+ this.maxServerRequestedRetries =
+ conf.getInt("hbase.client.server.requested.retries.max", 0);
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@@ -1289,12 +1297,39 @@ public class HConnectionManager {
List<Throwable> exceptions = new ArrayList<Throwable>();
long callStartTime;
+ int serverRequestedRetries = 0;
callStartTime = System.currentTimeMillis();
+ long serverRequestedWaitTime = 0;
// do not retry if region cannot be located. There are enough retries
// within instantiateRegionLocation.
callable.instantiateRegionLocation(false /* reload cache? */);
for(int tries = 0; ; tries++) {
+ // If server requested wait. We will wait for that time, and start
+ // again. Do not count this time/tries against the client retries.
+ if (serverRequestedWaitTime > 0) {
+ serverRequestedRetries++;
+
+ if (serverRequestedRetries > this.maxServerRequestedRetries)
+ throw new RetriesExhaustedException(callable.getServerName(),
+ callable.getRegionName(), callable.getRow(), serverRequestedRetries, exceptions);
+
+ long pauseTime = serverRequestedWaitTime + callStartTime
+ - System.currentTimeMillis();
+ LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for " +
+ pauseTime +"ms. serverRequestedRetries = " + serverRequestedRetries);
+ try {
+ Thread.sleep(pauseTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+
+ serverRequestedWaitTime = 0;
+ tries = 0;
+ callStartTime = System.currentTimeMillis();
+ }
+
try {
return getRegionServerWithoutRetries(callable, false);
} catch (DoNotRetryIOException ioe) {
@@ -1312,6 +1347,12 @@ public class HConnectionManager {
throw ioe;
} catch (Throwable t) {
exceptions.add(t);
+
+ if (t instanceof RegionOverloadedException) {
+ serverRequestedWaitTime = ((RegionOverloadedException)t).getBackoffTimeMillis();
+ continue;
+ }
+
if (tries == numRetries - 1) {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries, exceptions);
@@ -1330,7 +1371,7 @@ public class HConnectionManager {
equals(callable.location.getServerAddress())) {
long pauseTime = getPauseTime(tries);
if ((System.currentTimeMillis() - callStartTime + pauseTime) >
- rpcRetryTimeout) {
+ rpcRetryTimeout) {
throw new RetriesExhaustedException(callable.getServerName(),
callable.getRegionName(), callable.getRow(), tries,
exceptions);
@@ -2060,6 +2101,8 @@ public class HConnectionManager {
futures.add(task);
}
+ RegionOverloadedException toThrow = null;
+ long maxWaitTimeRequested = 0;
for (int i = 0; i < futures.size(); i++ ) {
Future<MultiPutResponse> future = futures.get(i);
MultiPut request = multiPuts.get(i);
@@ -2071,8 +2114,15 @@ public class HConnectionManager {
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException ex) {
// retry, unless it is not to be retried.
- if (ex.getCause() instanceof DoNotRetryIOException)
+ if (ex.getCause() instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)ex.getCause();
+ } else if (ex.getCause() instanceof RegionOverloadedException) {
+ RegionOverloadedException roe = (RegionOverloadedException)ex.getCause();
+ if (roe.getBackoffTimeMillis() > maxWaitTimeRequested) {
+ maxWaitTimeRequested = roe.getBackoffTimeMillis();
+ toThrow = roe;
+ }
+ }
}
// For each region
@@ -2108,6 +2158,8 @@ public class HConnectionManager {
}
}
}
+ if (toThrow != null) throw toThrow;
+
return failed;
}
@@ -2135,10 +2187,42 @@ public class HConnectionManager {
callStartTime = System.currentTimeMillis();
int tries;
+ long serverRequestedWaitTime = 0;
+ int serverRequestedRetries = 0;
for ( tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
- List<Put> failed;
+ // If server requested wait. We will wait for that time, and start
+ // again. Do not count this time/tries against the client retries.
+ if (serverRequestedWaitTime > 0) {
+ serverRequestedRetries++;
+
+ // Only do this for a configurable number of times?
+ if (serverRequestedRetries > this.maxServerRequestedRetries)
+ throw new RetriesExhaustedException("Server Overloaded: Still had "
+ + list.size() + " puts left after server requested " +
+ serverRequestedRetries + " retries.");
+
+ long sleepTimePending = callStartTime + serverRequestedWaitTime
+ - System.currentTimeMillis();
+ try {
+ Thread.sleep(sleepTimePending);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ tries = 0;
+ callStartTime = System.currentTimeMillis();
+ serverRequestedWaitTime = 0;
+ }
+
+ List<Put> failed = null;
List<MultiPut> multiPuts = this.splitPutsIntoMultiPuts(list, tableName, options);
- failed = this.processListOfMultiPut(multiPuts, tableName, options);
+ try {
+ failed = this.processListOfMultiPut(multiPuts, tableName, options);
+ } catch (RegionOverloadedException ex) {
+ serverRequestedWaitTime = ex.getBackoffTimeMillis();
+ // do not clear the list
+ continue;
+ }
list.clear();
if (failed != null && !failed.isEmpty()) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Aug 4 08:06:14 2012
@@ -281,6 +281,7 @@ public class HRegion implements HeapSize
= new ArrayList<Pair<Long,Long>>();
final FlushRequester flushListener;
private final long blockingMemStoreSize;
+ private final boolean waitOnMemstoreBlock;
final long threadWakeFrequency;
// Used to guard splits and closes
private final ReentrantReadWriteLock splitsAndClosesLock =
@@ -431,6 +432,7 @@ public class HRegion implements HeapSize
public HRegion(){
this.tableDir = null;
this.blockingMemStoreSize = 0L;
+ this.waitOnMemstoreBlock = true;
this.conf = null;
this.baseConf = null;
this.flushListener = null;
@@ -525,7 +527,9 @@ public class HRegion implements HeapSize
this.disableWAL = regionInfo.getTableDesc().isWALDisabled();
this.memstoreFlushSize = flushSize;
this.blockingMemStoreSize = this.memstoreFlushSize *
- conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
+ conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 2);
+ this.waitOnMemstoreBlock =
+ conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
this.readRequests =new RequestMetrics();
@@ -1888,7 +1892,6 @@ public class HRegion implements HeapSize
*/
public void put(Put put, Integer lockid, boolean writeToWAL)
throws IOException {
- this.writeRequests.incrTotalRequstCount();
checkReadOnly();
// Do a rough check that we have resources to accept a write. The check is
@@ -1896,6 +1899,7 @@ public class HRegion implements HeapSize
// read lock, resources may run out. For now, the thought is that this
// will be extremely rare; we'll deal with it when it happens.
checkResources();
+ this.writeRequests.incrTotalRequstCount();
splitsAndClosesLock.readLock().lock();
try {
@@ -2254,7 +2258,7 @@ public class HRegion implements HeapSize
* this and the synchronize on 'this' inside in internalFlushCache to send
* the notify.
*/
- private void checkResources() {
+ private void checkResources() throws RegionOverloadedException {
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return;
@@ -2262,13 +2266,16 @@ public class HRegion implements HeapSize
boolean blocked = false;
while (this.memstoreSize.get() > this.blockingMemStoreSize) {
requestFlush();
- if (!blocked) {
- LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
- "' on region " + Bytes.toStringBinary(getRegionName()) +
+ String msg = "Region " + Bytes.toStringBinary(getRegionName()) +
": memstore size " +
StringUtils.humanReadableInt(this.memstoreSize.get()) +
" is >= than blocking " +
- StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
+ StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size";
+ if (!this.waitOnMemstoreBlock) {
+ throw new RegionOverloadedException("Cannot accept mutations: " + msg, threadWakeFrequency);
+ } else if (!blocked) {
+ LOG.info("Blocking updates for '" + Thread.currentThread().getName()
+ + "'. "+ msg);
}
blocked = true;
synchronized(this) {
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java?rev=1369281&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java Sat Aug 4 08:06:14 2012
@@ -0,0 +1,28 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.RegionException;
+
+public class RegionOverloadedException extends RegionException {
+ private static final long serialVersionUID = -8436877560512061623L;
+
+ long backOffTime;
+
+
+ /** default constructor */
+ public RegionOverloadedException() {
+ super();
+ }
+
+ /** @param s message
+ * @param waitMillis -- request client to backoff for waitMillis
+ */
+ public RegionOverloadedException(String s, long waitMillis) {
+ super(s);
+ backOffTime = waitMillis;
+ }
+
+ public long getBackoffTimeMillis() {
+ return backOffTime;
+ }
+
+}