You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/04 10:06:14 UTC

svn commit: r1369281 - in /hbase/branches/0.89-fb: conf/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/regionserver/

Author: mbautin
Date: Sat Aug  4 08:06:14 2012
New Revision: 1369281

URL: http://svn.apache.org/viewvc?rev=1369281&view=rev
Log:
[HBASE-6423] [0.89-fb]  Enable a configuration setting to throw exceptions instead of blocking RPC threads upon hitting the memstore limit.

Author: aaiyer

Summary:
handle exceptions received at the client end
allow the number of server requested retries to be configurable on the client
set default to 0

Test Plan:
  mr test; Deploy to the Unicorn test cluster and test the
performance on intense work load.

Reviewers: kranganathan, liyintang

Reviewed By: kranganathan

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D532256

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java
Modified:
    hbase/branches/0.89-fb/conf/hbase-site.xml
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Modified: hbase/branches/0.89-fb/conf/hbase-site.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/conf/hbase-site.xml?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/conf/hbase-site.xml (original)
+++ hbase/branches/0.89-fb/conf/hbase-site.xml Sat Aug  4 08:06:14 2012
@@ -26,4 +26,5 @@
   <name>hbase.master.port</name>
   <value>60001</value>
 </property>
+
 </configuration>

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Aug  4 08:06:14 2012
@@ -243,6 +243,11 @@ public final class HConstants {
   public static final String HREGION_MEMSTORE_FLUSH_SIZE =
       "hbase.hregion.memstore.flush.size";
 
+  public static final String HREGION_MEMSTORE_BLOCK_MULTIPLIER =
+      "hbase.hregion.memstore.flush.size";
+  public static final String HREGION_MEMSTORE_WAIT_ON_BLOCK =
+      "hbase.hregion.memstore.flush.size";
+
   /** Default size of a reservation block   */
   public static final int DEFAULT_SIZE_RESERVATION_BLOCK = 1024 * 1024 * 5;
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Aug  4 08:06:14 2012
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.ipc.HBase
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionOverloadedException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.MetaUtils;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
@@ -386,6 +387,11 @@ public class HConnectionManager {
     // tables whose region cache prefetch are disabled.
     private final Set<Integer> regionCachePrefetchDisabledTables =
       new CopyOnWriteArraySet<Integer>();
+    // The number of times we will retry after receiving a RegionOverloadedException from the
+    // region server. Defaults to 0 (i.e. we will throw the exception and let the client handle retries)
+    // may not always be what you want. But, for the purposes of the HBaseThrift client, that this is
+    // created for, we do not want the thrift layer to hold up IPC threads handling retries.
+    private int maxServerRequestedRetries;
 
     /**
      * constructor
@@ -413,6 +419,8 @@ public class HConnectionManager {
       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
       this.numRetries = conf.getInt("hbase.client.retries.number", 10);
+      this.maxServerRequestedRetries =
+          conf.getInt("hbase.client.server.requested.retries.max", 0);
       this.rpcTimeout = conf.getInt(
           HConstants.HBASE_RPC_TIMEOUT_KEY,
           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@@ -1289,12 +1297,39 @@ public class HConnectionManager {
       List<Throwable> exceptions = new ArrayList<Throwable>();
 
       long callStartTime;
+      int serverRequestedRetries = 0;
       
       callStartTime = System.currentTimeMillis();
+      long serverRequestedWaitTime = 0;
       // do not retry if region cannot be located. There are enough retries
       // within instantiateRegionLocation.
       callable.instantiateRegionLocation(false /* reload cache? */);
       for(int tries = 0; ; tries++) {
+        // If server requested wait. We will wait for that time, and start
+        // again. Do not count this time/tries against the client retries.
+        if (serverRequestedWaitTime > 0) {
+          serverRequestedRetries++;
+
+          if (serverRequestedRetries > this.maxServerRequestedRetries)
+            throw new RetriesExhaustedException(callable.getServerName(),
+            callable.getRegionName(), callable.getRow(), serverRequestedRetries, exceptions);
+
+          long pauseTime = serverRequestedWaitTime + callStartTime
+              - System.currentTimeMillis();
+          LOG.debug("Got a BlockingWritesRetryLaterException: sleeping for " +
+              pauseTime +"ms. serverRequestedRetries = " + serverRequestedRetries);
+          try {
+            Thread.sleep(pauseTime);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException();
+          }
+
+          serverRequestedWaitTime = 0;
+          tries = 0;
+          callStartTime = System.currentTimeMillis();
+        }
+
         try {
           return getRegionServerWithoutRetries(callable, false);
         } catch (DoNotRetryIOException ioe) {
@@ -1312,6 +1347,12 @@ public class HConnectionManager {
           throw ioe;
         } catch (Throwable t) {
           exceptions.add(t);
+
+          if (t instanceof RegionOverloadedException) {
+            serverRequestedWaitTime = ((RegionOverloadedException)t).getBackoffTimeMillis();
+            continue;
+          }
+
           if (tries == numRetries - 1) {
             throw new RetriesExhaustedException(callable.getServerName(),
                 callable.getRegionName(), callable.getRow(), tries, exceptions);
@@ -1330,7 +1371,7 @@ public class HConnectionManager {
               equals(callable.location.getServerAddress())) {
             long pauseTime = getPauseTime(tries);
             if ((System.currentTimeMillis() - callStartTime + pauseTime) >
-                rpcRetryTimeout) {
+                 rpcRetryTimeout) {
               throw new RetriesExhaustedException(callable.getServerName(),
                   callable.getRegionName(), callable.getRow(), tries,
                   exceptions);
@@ -2060,6 +2101,8 @@ public class HConnectionManager {
         futures.add(task);
       }
 
+      RegionOverloadedException toThrow = null;
+      long maxWaitTimeRequested = 0;
       for (int i = 0; i < futures.size(); i++ ) {
         Future<MultiPutResponse> future = futures.get(i);
         MultiPut request = multiPuts.get(i);
@@ -2071,8 +2114,15 @@ public class HConnectionManager {
           throw new InterruptedIOException(e.getMessage());
         } catch (ExecutionException ex) {
           // retry, unless it is not to be retried.
-          if (ex.getCause() instanceof DoNotRetryIOException)
+          if (ex.getCause() instanceof DoNotRetryIOException) {
             throw (DoNotRetryIOException)ex.getCause();
+          } else if (ex.getCause() instanceof RegionOverloadedException) {
+            RegionOverloadedException roe = (RegionOverloadedException)ex.getCause();
+            if (roe.getBackoffTimeMillis() > maxWaitTimeRequested) {
+              maxWaitTimeRequested = roe.getBackoffTimeMillis();
+              toThrow = roe;
+            }
+          }
         }
 
         // For each region
@@ -2108,6 +2158,8 @@ public class HConnectionManager {
           }
         }
       }
+      if (toThrow != null) throw toThrow;
+
       return failed;
     }
 
@@ -2135,10 +2187,42 @@ public class HConnectionManager {
       callStartTime = System.currentTimeMillis();
 
       int tries;
+      long serverRequestedWaitTime = 0;
+      int serverRequestedRetries = 0;
       for ( tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
-        List<Put> failed;
+        // If server requested wait. We will wait for that time, and start
+        // again. Do not count this time/tries against the client retries.
+        if (serverRequestedWaitTime > 0) {
+          serverRequestedRetries++;
+
+          // Only do this for a configurable number of times?
+          if (serverRequestedRetries > this.maxServerRequestedRetries)
+            throw new RetriesExhaustedException("Server Overloaded: Still had "
+                + list.size() + " puts left after server requested " +
+                serverRequestedRetries + " retries.");
+
+          long sleepTimePending = callStartTime + serverRequestedWaitTime
+              - System.currentTimeMillis();
+            try {
+              Thread.sleep(sleepTimePending);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new InterruptedIOException();
+            }
+            tries = 0;
+            callStartTime = System.currentTimeMillis();
+            serverRequestedWaitTime = 0;
+        }
+
+        List<Put> failed = null;
         List<MultiPut> multiPuts = this.splitPutsIntoMultiPuts(list, tableName, options);
-        failed = this.processListOfMultiPut(multiPuts, tableName, options);
+        try {
+          failed = this.processListOfMultiPut(multiPuts, tableName, options);
+        } catch (RegionOverloadedException ex) {
+            serverRequestedWaitTime = ex.getBackoffTimeMillis();
+            // do not clear the list
+            continue;
+        }
 
         list.clear();
         if (failed != null && !failed.isEmpty()) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1369281&r1=1369280&r2=1369281&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Aug  4 08:06:14 2012
@@ -281,6 +281,7 @@ public class HRegion implements HeapSize
     = new ArrayList<Pair<Long,Long>>();
   final FlushRequester flushListener;
   private final long blockingMemStoreSize;
+  private final boolean waitOnMemstoreBlock;
   final long threadWakeFrequency;
   // Used to guard splits and closes
   private final ReentrantReadWriteLock splitsAndClosesLock =
@@ -431,6 +432,7 @@ public class HRegion implements HeapSize
   public HRegion(){
     this.tableDir = null;
     this.blockingMemStoreSize = 0L;
+    this.waitOnMemstoreBlock = true;
     this.conf = null;
     this.baseConf = null;
     this.flushListener = null;
@@ -525,7 +527,9 @@ public class HRegion implements HeapSize
     this.disableWAL = regionInfo.getTableDesc().isWALDisabled();
     this.memstoreFlushSize = flushSize;
     this.blockingMemStoreSize = this.memstoreFlushSize *
-      conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
+      conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 2);
+    this.waitOnMemstoreBlock =
+        conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
 
     this.readRequests =new RequestMetrics();
@@ -1888,7 +1892,6 @@ public class HRegion implements HeapSize
    */
   public void put(Put put, Integer lockid, boolean writeToWAL)
   throws IOException {
-    this.writeRequests.incrTotalRequstCount();
     checkReadOnly();
 
     // Do a rough check that we have resources to accept a write.  The check is
@@ -1896,6 +1899,7 @@ public class HRegion implements HeapSize
     // read lock, resources may run out.  For now, the thought is that this
     // will be extremely rare; we'll deal with it when it happens.
     checkResources();
+    this.writeRequests.incrTotalRequstCount();
     splitsAndClosesLock.readLock().lock();
 
     try {
@@ -2254,7 +2258,7 @@ public class HRegion implements HeapSize
    * this and the synchronize on 'this' inside in internalFlushCache to send
    * the notify.
    */
-  private void checkResources() {
+  private void checkResources() throws RegionOverloadedException {
 
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
@@ -2262,13 +2266,16 @@ public class HRegion implements HeapSize
     boolean blocked = false;
     while (this.memstoreSize.get() > this.blockingMemStoreSize) {
       requestFlush();
-      if (!blocked) {
-        LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
-          "' on region " + Bytes.toStringBinary(getRegionName()) +
+      String msg = "Region " + Bytes.toStringBinary(getRegionName()) +
           ": memstore size " +
           StringUtils.humanReadableInt(this.memstoreSize.get()) +
           " is >= than blocking " +
-          StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
+          StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size";
+      if (!this.waitOnMemstoreBlock) {
+        throw new RegionOverloadedException("Cannot accept mutations: " + msg, threadWakeFrequency);
+      } else if (!blocked) {
+        LOG.info("Blocking updates for '" + Thread.currentThread().getName()
+            + "'. "+ msg);
       }
       blocked = true;
       synchronized(this) {

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java?rev=1369281&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOverloadedException.java Sat Aug  4 08:06:14 2012
@@ -0,0 +1,28 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.RegionException;
+
+public class RegionOverloadedException extends RegionException {
+  private static final long serialVersionUID = -8436877560512061623L;
+
+  long backOffTime;
+
+
+  /** default constructor */
+  public RegionOverloadedException() {
+    super();
+  }
+
+  /** @param s message
+   *  @param waitMillis -- request client to backoff for waitMillis
+   */
+  public RegionOverloadedException(String s, long waitMillis) {
+    super(s);
+    backOffTime = waitMillis;
+  }
+
+  public long getBackoffTimeMillis() {
+    return backOffTime;
+  }
+
+}